module HAppS.State.Checkpoint
( createTxControl
, closeTxControl
, restoreState
, createCheckpoint
) where
import HAppS.State.Saver
import HAppS.Data.Serialize
import HAppS.Data.SerializeTH
import HAppS.State.Transaction
import HAppS.State.Types
import HAppS.State.ComponentSystem
import Data.Typeable
import Data.Maybe
import Control.Concurrent
import Control.Monad
import Control.Exception as E
import qualified Data.ByteString.Lazy.Char8 as L
import qualified Data.Map as M
import System.IO
import qualified Data.Map as M
import System.Log.Logger
logMC = logM "HAppS.State.Checkpoint"
data State = State
{ stateVersion :: Int
, stateCutoff :: Int
} deriving (Typeable,Show)
instance Version State
#ifndef __HADDOCK__
$(deriveSerialize ''State)
#else
instance Serialize State
#endif
createTxControl :: (Methods state, Component state) =>
Saver -> Proxy state -> IO (MVar TxControl)
createTxControl saver proxy
= do
eventSaverVar <- newMVar =<< createWriter NullSaver "events" 0
newMVar $ TxControl
{ ctlSaver = saver
, ctlEventSaver = eventSaverVar
, ctlAllComponents = allStateTypes proxy
, ctlChildren = [] }
closeTxControl :: MVar TxControl -> IO ()
closeTxControl ctlVar
= do ctl <- takeMVar ctlVar
writerClose =<< takeMVar (ctlEventSaver ctl)
restoreState :: MVar TxControl -> IO ()
restoreState ctlVar
= withMVar ctlVar $ \ctl ->
do
mbState <- readState ctl
case mbState of
Nothing ->
do writeState ctl (State 0 0)
swapMVar (ctlEventSaver ctl) =<< createWriter (ctlSaver ctl) "events" 0
Just state ->
do let cutoff = stateCutoff state
loadState ctl cutoff
offset <- loadEvents ctl cutoff
swapMVar (ctlEventSaver ctl) =<< createWriter (ctlSaver ctl) "events" (cutoff+offset)
return ()
loadState :: TxControl -> Int -> IO ()
loadState ctl cutoff
= do logMC NOTICE $ "Loading components from storage."
checkpoints <- withReader ctl "checkpoints" cutoff $ loadCheckpoints
forM_ (ctlAllComponents ctl) $ \stateType
-> case M.lookup stateType checkpoints of
Just state -> setNewState stateType state
Nothing -> return ()
logMC NOTICE "All components successfully loaded"
loadEvents :: TxControl -> Int -> IO Int
loadEvents ctl cutoff
= do logMC NOTICE "Loading events from storage"
(events, offset) <- withReader ctl "events" cutoff $ readerGet
forM_ events $ \(EventLogEntry context object) -> runColdEvent context object
logMC NOTICE "All events successfully replayed."
return offset
withReader ctl key cutoff
= bracket (createReader (ctlSaver ctl) key cutoff)
(readerClose)
withWriter ctl key cutoff
= bracket (createWriter (ctlSaver ctl) key cutoff)
(writerClose)
readState ctl
= withReader ctl "current" 0 $ \s ->
liftM listToMaybe $ readerGetUncut s
writeState :: TxControl -> State -> IO ()
writeState ctl state
= bracket (createWriter (ctlSaver ctl) "current" 0)
(writerClose)
(\saver -> writerAtomicReplace saver state)
loadCheckpoints :: ReaderStream (M.Map String L.ByteString) -> IO (M.Map String L.ByteString)
loadCheckpoints saver
= do checkpointss <- readerGetUncut saver
case checkpointss of
[checkpoints] -> return checkpoints
[] -> return M.empty
_ -> error "Failed to read checkpoints."
saveCheckpoints :: WriterStream (M.Map String L.ByteString) -> M.Map String L.ByteString -> IO ()
saveCheckpoints saver checkpoints
= do mv <- newEmptyMVar
writerAdd saver checkpoints (putMVar mv ())
takeMVar mv
createCheckpoint :: MVar TxControl -> IO ()
createCheckpoint ctlVar
= withMVar ctlVar $ \ctl ->
do logMC NOTICE "Initiating checkpoint."
newCut <- writerCut =<< readMVar (ctlEventSaver ctl)
withWriter ctl "checkpoints" newCut $ \saver ->
do allStates <- mapM getState (ctlAllComponents ctl)
saveCheckpoints saver (M.fromList $ zip (ctlAllComponents ctl) allStates)
writeState ctl $ State {stateVersion = 0
,stateCutoff = newCut}
logMC NOTICE "Checkpoint successfully serialized."