{-# LANGUAGE ForeignFunctionInterface #-}
-- A log is a stack of entries that supports efficient pushing of
-- new entries and fetching of old. It can be considered an
-- extendible array of entries.
--
module Data.Acid.Log
    ( FileLog
    , LogKey(..)
    , EntryId
    , openFileLog
    , closeFileLog
    , pushEntry
    , pushAction
    , readEntriesFrom
    , newestEntry
    , askCurrentEntryId
    ) where

import Data.Acid.Archive as Archive
import System.Directory
import System.FilePath
import System.IO
import System.Posix                              ( handleToFd, Fd(..), fdWriteBuf
                                                 , closeFd )
import Foreign.C
import Foreign.Ptr
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.ByteString.Lazy as Lazy
import qualified Data.ByteString as Strict
import qualified Data.ByteString.Unsafe as Strict
import Data.List
import Data.Maybe
import qualified Data.Serialize.Get as Get
import qualified Data.Serialize.Put as Put
import Data.SafeCopy                             ( safePut, safeGet, SafeCopy )

import Text.Printf                               ( printf )

import Paths_acid_state                          ( version )
import Data.Version                              ( showVersion )

type EntryId = Int

data FileLog object
    = FileLog { logIdentifier  :: LogKey object
              , logCurrent     :: MVar Fd -- Handle
              , logNextEntryId :: TVar EntryId
              , logQueue       :: TVar ([Lazy.ByteString], [IO ()])
              , logThreads     :: [ThreadId]
              }

data LogKey object
    = LogKey
      { logDirectory :: FilePath
      , logPrefix    :: String
      }

formatLogFile :: String -> EntryId -> String
formatLogFile tag n
    = printf "%s-%010d.log" tag n

findLogFiles :: LogKey object -> IO [(EntryId, FilePath)]
findLogFiles identifier
    = do createDirectoryIfMissing True (logDirectory identifier)
         files <- getDirectoryContents (logDirectory identifier)
         return  [ (tid, logDirectory identifier </> file)
                 | file <- files
                 , logFile <- maybeToList (stripPrefix (logPrefix identifier ++ "-") file)
                 , (tid, ".log") <- reads logFile ]


saveVersionFile :: LogKey object -> IO ()
saveVersionFile key
    = do exist <- doesFileExist versionFile
         unless exist $ writeFile versionFile (showVersion version)
    where versionFile = logDirectory key </> logPrefix key <.> "version"

openFileLog :: LogKey object -> IO (FileLog object)
openFileLog identifier
    = do logFiles <- findLogFiles identifier
         saveVersionFile identifier
         currentState <- newEmptyMVar
         queue <- newTVarIO ([], [])
         nextEntryRef <- newTVarIO 0
         tid2 <- forkIO $ fileWriter currentState queue
         let fLog = FileLog { logIdentifier  = identifier
                            , logCurrent     = currentState
                            , logNextEntryId = nextEntryRef
                            , logQueue       = queue
                            , logThreads     = [tid2] }
         if null logFiles
            then do let currentEntryId = 0
                    currentHandle <- openBinaryFile (logDirectory identifier </> formatLogFile (logPrefix identifier) currentEntryId) WriteMode
                    fd <- handleToFd currentHandle
                    putMVar currentState fd
            else do let (lastFileEntryId, lastFilePath) = maximum logFiles
                    entries <- readEntities lastFilePath
                    let currentEntryId = lastFileEntryId + length entries
                    atomically $ writeTVar nextEntryRef currentEntryId
                    currentHandle <- openBinaryFile (logDirectory identifier </> formatLogFile (logPrefix identifier) currentEntryId) WriteMode
                    fd <- handleToFd currentHandle
                    putMVar currentState fd
         return fLog

foreign import ccall "fsync" c_fsync :: CInt -> IO CInt

fileWriter :: MVar Fd -> TVar ([Lazy.ByteString], [IO ()]) -> IO ()
fileWriter currentState queue
    = forever $
      do (entries, actions) <- atomically $ do (entries, actions) <- readTVar queue
                                               when (null entries && null actions) retry
                                               writeTVar queue ([], [])
                                               -- We don't actually have to reverse the actions
                                               -- but I don't think it hurts performance much.
                                               return (reverse entries, reverse actions)
         withMVar currentState $ \fd ->
           do let arch = Archive.packEntries entries
              writeToDisk fd (repack arch)
         sequence_ actions
         yield

-- Repack a lazy bytestring into larger blocks that can be efficiently written to disk.
repack :: Lazy.ByteString -> [Strict.ByteString]
repack = worker
    where worker bs
              | Lazy.null bs = []
              | otherwise    = Strict.concat (Lazy.toChunks (Lazy.take blockSize bs)) : worker (Lazy.drop blockSize bs)
          blockSize = 4*1024

writeToDisk :: Fd -> [Strict.ByteString] -> IO ()
writeToDisk _ [] = return ()
writeToDisk fd@(Fd c_fd) xs
    = do mapM_ worker xs
         c_fsync c_fd
         return ()
    where worker bs
              = do let len = Strict.length bs
                   count <- Strict.unsafeUseAsCString bs $ \ptr -> fdWriteBuf fd (castPtr ptr) (fromIntegral len)
                   if fromIntegral count < len
                      then worker (Strict.drop (fromIntegral count) bs)
                      else return ()


closeFileLog :: FileLog object -> IO ()
closeFileLog fLog
    = modifyMVar_ (logCurrent fLog) $ \fd ->
      do closeFd fd
         _ <- forkIO $ forM_ (logThreads fLog) killThread
         return $ error "FileLog has been closed"

readEntities :: FilePath -> IO [Lazy.ByteString]
readEntities path
    = do archive <- Lazy.readFile path
         return $ worker (Archive.readEntries archive)
    where worker Done = []
          worker (Next entry next)
              = entry : worker next
          worker Fail{} = []

-- Read all durable entries younger than the given EntryId.
-- Note that entries written during or after this call won't
-- be included in the returned list.
readEntriesFrom :: SafeCopy object => FileLog object -> EntryId -> IO [object]
readEntriesFrom fLog youngestEntry
    = do -- Cut the log so we can read written entries without interfering
         -- with the writing of new entries.
         entryCap <- cutFileLog fLog
         -- We're interested in these entries: youngestEntry <= x < entryCap.
         logFiles <- findLogFiles (logIdentifier fLog)
         let sorted = sort logFiles
             findRelevant [] = []
             findRelevant [ logFile ]
                 | youngestEntry <= rangeStart logFile && rangeStart logFile < entryCap
                 = [ logFile ]
                 | otherwise
                 = []
             findRelevant ( left : right : xs )
                 | youngestEntry >= rangeStart right -- All entries in 'path' must be too old if this is true
                 = findRelevant (right : xs)
                 | rangeStart left >= entryCap -- All files from now on contain entries that are too young.
                 = []
                 | otherwise
                 = left : findRelevant (right : xs)

             relevant = findRelevant sorted
             firstEntryId = case relevant of
                              []                     -> 0
                              ( logFile : _logFiles) -> rangeStart logFile

         archive <- liftM Lazy.concat $ mapM (Lazy.readFile . snd) relevant
         let entries = entriesToList $ readEntries archive
         return $ map decode'
                $ take (entryCap - youngestEntry)             -- Take events under the eventCap.
                $ drop (youngestEntry - firstEntryId) entries -- Drop entries that are too young.

    where rangeStart (firstEntryId, _path) = firstEntryId


cutFileLog :: FileLog object -> IO EntryId
cutFileLog fLog
    = do mvar <- newEmptyMVar
         let action = do currentEntryId <- atomically $
                                           do (entries, _) <- readTVar (logQueue fLog)
                                              next <- readTVar (logNextEntryId fLog)
                                              return (next - length entries)
                         modifyMVar_ (logCurrent fLog) $ \old ->
                           do closeFd old
                              handleToFd =<< openBinaryFile (logDirectory key </> formatLogFile (logPrefix key) currentEntryId) WriteMode
                         putMVar mvar currentEntryId
         pushAction fLog action
         takeMVar mvar
    where key = logIdentifier fLog

-- Finds the newest entry in the log. Doesn't work on open logs.
-- Do not use after the log has been opened.
-- Implementation: Search the newest log files first. Once a file
--                 containing at least one valid entry is found,
--                 return the last entry in that file.
newestEntry :: SafeCopy object => LogKey object -> IO (Maybe object)
newestEntry identifier
    = do logFiles <- findLogFiles identifier
         let sorted = reverse $ sort logFiles
             (_eventIds, files) = unzip sorted
         archives <- mapM Lazy.readFile files
         return $ worker archives
    where worker [] = Nothing
          worker (archive:archives)
              = case Archive.readEntries archive of
                  Done            -> worker archives
                  Next entry next -> Just (decode' (lastEntry entry next))
                  Fail{}          -> worker archives
          lastEntry entry Done   = entry
          lastEntry entry Fail{} = entry
          lastEntry _ (Next entry next) = lastEntry entry next

-- Schedule a new log entry. This call does not block
-- The given IO action runs once the object is durable. The IO action
-- blocks the serialization of events so it should be swift.
pushEntry :: SafeCopy object => FileLog object -> object -> IO () -> IO ()
pushEntry fLog object finally
    = atomically $
      do tid <- readTVar (logNextEntryId fLog)
         writeTVar (logNextEntryId fLog) (tid+1)
         (entries, actions) <- readTVar (logQueue fLog)
         writeTVar (logQueue fLog) ( encoded : entries, finally : actions )
    where encoded = Lazy.fromChunks [ Strict.copy $ Put.runPut (safePut object) ]

-- The given IO action is executed once all previous entries are durable.
pushAction :: FileLog object -> IO () -> IO ()
pushAction fLog finally
    = atomically $
      do (entries, actions) <- readTVar (logQueue fLog)
         writeTVar (logQueue fLog) (entries, finally : actions)

askCurrentEntryId :: FileLog object -> IO EntryId
askCurrentEntryId fLog
    = atomically $ readTVar (logNextEntryId fLog)


-- FIXME: Check for unused input.
decode' :: SafeCopy object => Lazy.ByteString -> object
decode' inp
    = case Get.runGetLazy safeGet inp of
        Left msg  -> error msg
        Right val -> val