module Data.Acid.Log
( FileLog
, LogKey(..)
, EntryId
, openFileLog
, closeFileLog
, pushEntry
, readEntriesFrom
, newestEntry
, askCurrentEntryId
) where
import Data.Acid.Archive as Archive
import System.Directory
import System.FilePath
import System.IO
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.ByteString.Lazy as Lazy
import Data.List
import Data.Maybe
import Data.Binary
import Text.Printf ( printf )
import Paths_acid_state ( version )
import Data.Version ( showVersion )
type EntryId = Int
data FileLog object
= FileLog { logIdentifier :: LogKey object
, logCurrent :: MVar (Handle)
, logNextEntryId :: TVar EntryId
, logQueue :: TVar ([Lazy.ByteString], [IO ()])
, logThreads :: [ThreadId]
}
data LogKey object
= LogKey
{ logDirectory :: FilePath
, logPrefix :: String
}
formatLogFile :: String -> EntryId -> String
formatLogFile tag n
= printf "%s-%010d.log" tag n
findLogFiles :: LogKey object -> IO [(EntryId, FilePath)]
findLogFiles identifier
= do createDirectoryIfMissing True (logDirectory identifier)
files <- getDirectoryContents (logDirectory identifier)
return [ (tid, logDirectory identifier </> file)
| file <- files
, logFile <- maybeToList (stripPrefix (logPrefix identifier ++ "-") file)
, (tid, ".log") <- reads logFile ]
saveVersionFile :: LogKey object -> IO ()
saveVersionFile key
= do exist <- doesFileExist versionFile
unless exist $ writeFile versionFile (showVersion version)
where versionFile = logDirectory key </> logPrefix key <.> "version"
openFileLog :: LogKey object -> IO (FileLog object)
openFileLog identifier
= do logFiles <- findLogFiles identifier
saveVersionFile identifier
currentState <- newEmptyMVar
queue <- newTVarIO ([], [])
nextEntryRef <- newTVarIO 0
tid2 <- forkIO $ fileWriter currentState queue
let log = FileLog { logIdentifier = identifier
, logCurrent = currentState
, logNextEntryId = nextEntryRef
, logQueue = queue
, logThreads = [tid2] }
if null logFiles
then do let currentEntryId = 0
currentHandle <- openBinaryFile (logDirectory identifier </> formatLogFile (logPrefix identifier) currentEntryId) WriteMode
putMVar currentState currentHandle
else do let (lastFileEntryId, lastFilePath) = maximum logFiles
entries <- readEntities lastFilePath
let currentEntryId = lastFileEntryId + length entries
atomically $ writeTVar nextEntryRef currentEntryId
currentHandle <- openFile (logDirectory identifier </> formatLogFile (logPrefix identifier) currentEntryId) WriteMode
putMVar currentState currentHandle
return log
fileWriter currentState queue
= forever $
do (entries, actions) <- atomically $ do (entries, actions) <- readTVar queue
when (null entries && null actions) retry
writeTVar queue ([], [])
return (reverse entries, reverse actions)
withMVar currentState $ \handle ->
do let arch = Archive.packEntries entries
Lazy.hPutStr handle arch
hFlush handle
return ()
sequence_ actions
yield
closeFileLog :: FileLog object -> IO ()
closeFileLog log
= modifyMVar_ (logCurrent log) $ \handle ->
do hClose handle
forkIO $ forM_ (logThreads log) killThread
return $ error "FileLog has been closed"
readEntities :: FilePath -> IO [Lazy.ByteString]
readEntities path
= do archive <- Lazy.readFile path
return $ worker (Archive.readEntries archive)
where worker Done = []
worker (Next entry next)
= entry : worker next
worker Fail{} = []
readEntriesFrom :: Binary object => FileLog object -> EntryId -> IO [object]
readEntriesFrom log youngestEntry
= do
entryCap <- cutFileLog log
logFiles <- findLogFiles (logIdentifier log)
let sorted = sort logFiles
findRelevant [] = []
findRelevant [ logFile ]
| youngestEntry <= rangeStart logFile && rangeStart logFile < entryCap
= [ logFile ]
| otherwise
= []
findRelevant ( left : right : xs )
| youngestEntry >= rangeStart right
= findRelevant (right : xs)
| rangeStart left >= entryCap
= []
| otherwise
= left : findRelevant (right : xs)
relevant = findRelevant sorted
firstEntryId = case relevant of
[] -> 0
( logFile : _logFiles) -> rangeStart logFile
archive <- liftM Lazy.concat $ mapM Lazy.readFile (map snd relevant)
let entries = entriesToList $ readEntries archive
return $ map decode
$ take (entryCap youngestEntry)
$ drop (youngestEntry firstEntryId) entries
where rangeStart (firstEntryId, _path) = firstEntryId
cutFileLog :: FileLog object -> IO EntryId
cutFileLog log
= do mvar <- newEmptyMVar
let action = do currentEntryId <- atomically $
do (entries, _) <- readTVar (logQueue log)
next <- readTVar (logNextEntryId log)
return (next length entries)
modifyMVar_ (logCurrent log) $ \old ->
do hClose old
openFile (logDirectory key </> formatLogFile (logPrefix key) currentEntryId) WriteMode
putMVar mvar currentEntryId
atomically $
do (entries, actions) <- readTVar (logQueue log)
writeTVar (logQueue log) (entries, action : actions)
takeMVar mvar
where key = logIdentifier log
newestEntry :: Binary object => LogKey object -> IO (Maybe object)
newestEntry identifier
= do logFiles <- findLogFiles identifier
let sorted = reverse $ sort logFiles
(eventIds, files) = unzip sorted
archives <- mapM Lazy.readFile files
return $ worker archives
where worker [] = Nothing
worker (archive:archives)
= case Archive.readEntries archive of
Done -> worker archives
Next entry next -> Just (decode (lastEntry entry next))
Fail{} -> worker archives
lastEntry entry Done = entry
lastEntry entry Fail{} = entry
lastEntry _ (Next entry next) = lastEntry entry next
pushEntry :: Binary object => FileLog object -> object -> IO () -> IO ()
pushEntry log object finally
= atomically $
do tid <- readTVar (logNextEntryId log)
writeTVar (logNextEntryId log) (tid+1)
(entries, actions) <- readTVar (logQueue log)
writeTVar (logQueue log) ( encoded : entries, finally : actions )
where encoded = encode object
askCurrentEntryId :: FileLog object -> IO EntryId
askCurrentEntryId log
= atomically $ readTVar (logNextEntryId log)