module HAppS.State.Transaction where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception(handle,throw,Exception(..),AsyncException(..),throwIO,evaluate)
import Control.Monad.State
import Control.Monad.Reader
import qualified Data.Map as M
import qualified Data.ByteString.Lazy as L
import Data.IORef
import Data.Maybe
import System.IO.Unsafe
import System.Random
import System.Time(getClockTime,ClockTime(TOD))
import System.Log.Logger
import HAppS.State.ComponentSystem
import HAppS.State.Monad
import HAppS.State.Saver
import HAppS.Data.Serialize
import HAppS.Data.SerializeTH
import HAppS.State.Types
import HAppS.Util.Common (Seconds)
import Data.Typeable
import GHC.Base
import qualified Data.Binary as Binary
logMT = logM "HAppS.State.Transaction"
getTime :: (Integral epochTime) => AnyEv epochTime
getTime = sel (fromIntegral . txTime . evContext)
getEventClockTime :: AnyEv ClockTime
getEventClockTime = do milliSeconds <- sel (txTime . evContext)
return $ TOD (fromIntegral milliSeconds) 0
getEventId :: (Integral txId) => AnyEv txId
getEventId = sel (fromIntegral . txId . evContext)
instance Version TxContext
#ifndef __HADDOCK__
$(deriveSerialize ''TxContext)
#else
instance Serialize TxContext
#endif
instance Version StdGen
instance Serialize StdGen where
getCopy = contain $ liftM read safeGet
putCopy = contain . safePut . show
type TypeString = String
#ifndef __HADDOCK__
data EventHandler where
UpdateHandler :: UpdateEvent ev res =>
(Maybe TxContext -> ev -> IO res) ->
(Object -> ev) ->
EventHandler
QueryHandler :: QueryEvent ev res =>
(ev -> IO res) ->
(Object -> ev) ->
EventHandler
#else
data EventHandler = EventHandler
#endif
type EventMap = M.Map TypeString EventHandler
data EmitInternal = EmitInternal EventMap
emitRef :: IORef EmitInternal
emitRef = unsafePerformIO $ newIORef (error "HAppS not initiated")
emitFunc :: (Serialize ev, Typeable res) =>
EventMap -> TypeString -> ev -> IO res
emitFunc eventMap eventType ev
= case M.lookup eventType eventMap of
Nothing -> error $ "Emitted event to unknown component. Ev: " ++ eventType
Just (UpdateHandler fn _) -> unsafeCoerce# fn Nothing ev
Just (QueryHandler fn _) -> unsafeCoerce# fn ev
emitEvent' :: (Serialize ev, Typeable res) => TypeString -> ev -> IO res
emitEvent' eventType ev
= do internal <- readIORef emitRef
case internal of
EmitInternal eventMap -> emitFunc eventMap eventType ev
emitEvent :: (Serialize ev, Typeable res) => ev -> IO res
emitEvent ev = emitEvent' (show (typeOf ev)) ev
initEventMap :: (Methods st, Component st) => MVar TxControl -> Proxy st -> IO ()
initEventMap ctlVar componentProxy
= do eventMap <- createEventMap ctlVar componentProxy
writeIORef emitRef $ EmitInternal eventMap
createEventMap :: (Methods st, Component st) => MVar TxControl -> Proxy st -> IO EventMap
createEventMap ctlVar componentProxy
= do maps <- forM (M.elems componentTree) $ \(MethodMap m) ->
do tx <- createNewTxRun
forkIO $ forever $ atomically $
do HR ev fn <- readTChan (txEventQueue tx)
tev <- addTxId tx =<< newTxContext
writeTChan (txProcessQueue tx) (IHR tev ev fn)
ctl <- readMVar ctlVar
runTxLoop (ctlEventSaver ctl) (txProcessQueue tx) initialValue
return $ M.union (extraEvents tx) (M.map (eventHandler tx) m)
return $ M.unions maps
where (componentTree, _ioActions) = collectHandlers componentProxy
eventHandler tx (Update fn)
= let updateEmitter mbContext ev
= let realEv = ev
in do mv <- newEmptyMVar
let handler fn =
case mbContext of
Nothing -> writeTChan (txEventQueue tx) (HR realEv fn)
Just cxt -> do lastCxt <- readTVar (txLastTxContext tx)
writeTChan (txProcessQueue tx) $ IHR cxt realEv $
if txId lastCxt < txId cxt
then fn
else handleUpdate (putMVar mv) $ return $ error "Forced result from dated event."
atomically $ handler $ handleUpdate (putMVar mv) (fn realEv)
takeMVar mv
in UpdateHandler updateEmitter parseObject
eventHandler tx (Query fn)
= let queryEmitter ev
= let realEv = ev
in do mv <- newEmptyMVar
quickQuery' tx $ HR realEv $ handleQuery (putMVar mv) (fn realEv)
takeMVar mv
in QueryHandler queryEmitter parseObject
instance QueryEvent () L.ByteString
instance UpdateEvent L.ByteString ()
extraEvents :: Serialize st => TxRun st -> EventMap
extraEvents tx
= M.fromList [ (getStateType stateType, getStateHandler tx)
, (setNewStateType stateType, setNewStateHandler tx)
]
where t :: TxRun st -> st
t _ = undefined
stateType = show (typeOf (t tx))
getStateHandler tx
= let fn :: () -> IO L.ByteString
fn () = do mv <- newEmptyMVar
quickQuery' tx $ HR () $ \context st ->
return (Nothing, putMVar mv (serialize (context, st)))
takeMVar mv
in QueryHandler fn (error "No parser for GetState event")
setNewStateHandler tx
= let fn :: L.ByteString -> IO ()
fn bs = do ((context, newState), _) <- evaluate $ deserialize bs
mv <- newEmptyMVar
quickQuery' tx $ HR () $ \_context _oldState ->
return (Just newState, putMVar mv ())
takeMVar mv
atomically $ writeTVar (txLastTxContext tx) context
in UpdateHandler (const fn) (error "No parser for SetNewState event")
allStateTypes :: (Methods a, Component a) => Proxy a -> [TypeString]
allStateTypes proxy = let (componentTree, _ioActions) = collectHandlers proxy
in M.keys componentTree
componentIO :: (Methods a, Component a) => Proxy a -> [IO ()]
componentIO proxy = let (_componentTree, ioActions) = collectHandlers proxy
in ioActions
createNewTxRun :: IO (TxRun st)
createNewTxRun =
atomically $
do eventQueue <- newTChan
processQueue <- newTChan
lastContext <- newTVar (TxContext 0 0 0 (mkStdGen 42))
return $ TxRun eventQueue processQueue lastContext
setNewStateType str = "SetNewState: " ++ str
getStateType str = "GetState: " ++ str
setNewState :: TypeString -> L.ByteString -> IO ()
setNewState stateType state
= emitEvent' (setNewStateType stateType) state
getState :: TypeString -> IO L.ByteString
getState stateType
= emitEvent' (getStateType stateType) ()
data SetNewState st = SetNewState L.ByteString deriving (Typeable)
data GetState st = GetState deriving (Typeable)
instance Version (SetNewState st)
instance Typeable st => Serialize (SetNewState st) where
putCopy (SetNewState lbs) = contain $ Binary.put lbs
getCopy = contain $ liftM SetNewState Binary.get
instance Version (GetState st)
instance Typeable st => Serialize (GetState st) where
putCopy GetState = contain $ return ()
getCopy = contain $ return GetState
instance Typeable st => UpdateEvent (SetNewState st) ()
instance Typeable st => QueryEvent (GetState st) L.ByteString
update :: (MonadIO m, UpdateEvent ev res) => ev -> m res
update = liftIO . emitEvent
query :: (MonadIO m, QueryEvent ev res) => ev -> m res
query = liftIO . emitEvent
quickQuery' :: (Serialize st) => TxRun st -> HR st -> IO ()
quickQuery' txrun (HR ev fun)
= do now <- getEpochMilli
atomically $
do tx <- readTVar (txLastTxContext txrun)
writeTChan (txProcessQueue txrun) $ IHR tx{txTime=now} ev fun
type Runner ev res = IO (IO ev, res -> IO ())
type EH i o = i -> IO o
data Event = forall ev. Serialize ev => Event ev
data IHR st = forall ev. (Serialize ev)
=> IHR TxContext
ev
(RunHandler st ev)
data HR st = forall ev. (Serialize ev)
=> HR ev
(RunHandler st ev)
type RunHandler st ev = TxContext -> st -> IO (Maybe st, IO ())
data Res a = Ok a | Error Exception
type EventQueue st = TChan (HR st)
type ProcessQueue st = TChan (IHR st)
data TxRun st = TxRun {txEventQueue :: !(EventQueue st)
,txProcessQueue :: !(ProcessQueue st)
,txLastTxContext :: !(TVar TxContext)}
type EvLoaders' st = M.Map String (ProcessQueue st -> L.ByteString -> IO (TxId,L.ByteString))
type EvLoaders = M.Map String (L.ByteString -> IO (TxId,L.ByteString))
setEvLoadersQueue :: ProcessQueue st -> EvLoaders' st -> EvLoaders
setEvLoadersQueue queue = M.map (\fn -> fn queue)
runColdEvent :: TxContext -> Object -> IO ()
runColdEvent cxt obj
= do EmitInternal eventMap <- readIORef emitRef
runColdEventFunc cxt obj eventMap
runColdEventFunc :: TxContext -> Object -> EventMap -> IO ()
runColdEventFunc cxt obj eventMap
= case M.lookup eventType eventMap of
Nothing -> error $ "Couldn't find handler for cold event of type: " ++ eventType
Just (QueryHandler run parse)
-> do run (parse obj)
return ()
Just (UpdateHandler run parse)
-> do run (Just cxt) (parse obj)
return ()
where eventType = objectType obj
eventTString :: Serialize ev => ev -> TypeString
eventTString ev = show (typeOf ev)
handleEvent :: (st -> Env -> Ev m res -> STM intermediate) -> (st -> intermediate -> IO (Maybe st, res))
-> (res -> IO ()) -> Ev m res -> RunHandler st ev
handleEvent runner stateCheck ofun action tx st
= handle eh $
do intermediate <- atomically $ runQuery
(newState, res) <- stateCheck st intermediate
return (newState, ofun res)
where runQuery = do rs <- newTVar (txStdGen tx)
let env = Env { evContext = tx, evRandoms = rs }
intermediate <- runner st env action
return $ intermediate
eh e = do logMT ERROR ("handleEvent FAIL: "++ show e)
return (Nothing,ofun (throw e))
handleQuery :: (res -> IO ()) -> Query st res -> RunHandler st ev
handleQuery = handleEvent (\st env (Ev cmd) -> runReaderT (cmd env) st) (\_st res -> return (Nothing, res))
handleUpdate :: (res -> IO ()) -> Update st res -> RunHandler st ev
handleUpdate = handleEvent (\st env (Ev cmd) -> runStateT (cmd env) st) (\st (res,st') -> checkDiff st st' >>= \diff -> return (diff, res))
checkDiff :: a -> a -> IO (Maybe a)
checkDiff _old new
= return (Just new)
processEvent :: (Serialize ev) =>
TxRun st -> ev
-> (RunHandler st ev)
-> IO ()
processEvent txrun ev runHandler
= atomically $ writeTChan (txEventQueue txrun) $ HR ev (runHandler)
getEpochMilli :: IO EpochMilli
getEpochMilli =
do TOD sec pico <- getClockTime
return $ fromIntegral $ sec * 1000 + pico `div` 10^9
newTxContext :: STM TxContext
newTxContext = unsafeIOToSTM $ do
milli <- getEpochMilli
let txid = 1
sgen <- modifyMVar globalRandomGen (return . split)
let (rand, sgen') = random sgen
return $ TxContext txid rand milli sgen'
addTxId :: TxRun st -> TxContext -> STM TxContext
addTxId tx context
= do lastContext <- readTVar (txLastTxContext tx)
let new = context{txId = txId lastContext + 1}
writeTVar (txLastTxContext tx) new
return new
globalRandomGen :: MVar StdGen
globalRandomGen = unsafePerformIO (newMVar =<< getStdGen)
data TxConfig = TxConfig
{ txcCheckpointSeconds :: Seconds,
txcOperationMode :: OperationMode,
txcClusterSize :: Int,
txcClusterPort :: Int,
txcCommitFrequency :: Int
}
data TxControl = TxControl
{ ctlSaver :: Saver
, ctlEventSaver :: MVar (WriterStream EventLogEntry)
, ctlAllComponents :: [String]
, ctlChildren :: [(ThreadId, MVar ())]
}
data EventLogEntry = EventLogEntry TxContext Object deriving (Typeable, Show)
instance Version EventLogEntry
instance Serialize EventLogEntry where
putCopy (EventLogEntry context obj) = contain $ safePut (context,obj)
getCopy = contain $
do (context, obj) <- safeGet
return $ EventLogEntry context obj
data OperationMode
= SingleMode
| ClusterMode String
nullTxConfig :: TxConfig
nullTxConfig = TxConfig { txcCheckpointSeconds = 60*60*24,
txcOperationMode = SingleMode,
txcClusterSize = 0,
txcClusterPort = 8500,
txcCommitFrequency = 50
}
runTxLoop :: MVar (WriterStream EventLogEntry) -> ProcessQueue st -> st -> IO ()
runTxLoop eventSaverVar queue st0 =
let loop st = do
IHR context ev fun <- atomically $ readTChan queue
let tstring = eventTString ev
logMT NOTICE $ ("> Event "++show (txId context)++" of "++tstring)
(mst,ra) <- fun context st
case mst of
Nothing -> do forkIO $ logMT NOTICE "> pure" >> ra
loop st
Just st' -> do eventSaver <- readMVar eventSaverVar
writerAdd eventSaver (EventLogEntry context (mkObject ev)) (logMT NOTICE "> disk " >> ra)
loop st'
in do forkIO $ handle excHandler $ loop st0
return ()
where excHandler (AsyncException ThreadKilled) = return ()
excHandler BlockedIndefinitely = return ()
excHandler e = throwIO e