{-# OPTIONS -fglasgow-exts -cpp -fth #-} module HAppS.State.Transaction where import Control.Concurrent import Control.Concurrent.STM import Control.Exception(handle,throw,Exception(..),AsyncException(..),throwIO,evaluate) import Control.Monad.State import Control.Monad.Reader import qualified Data.Map as M import qualified Data.ByteString.Lazy as L import Data.IORef import Data.Maybe import System.IO.Unsafe import System.Random import System.Time(getClockTime,ClockTime(TOD)) import System.Log.Logger import HAppS.State.ComponentSystem import HAppS.State.Monad import HAppS.State.Saver import HAppS.Data.Serialize import HAppS.Data.SerializeTH import HAppS.State.Types import HAppS.Util.Common (Seconds) import Data.Typeable import GHC.Base import qualified Data.Binary as Binary logMT = logM "HAppS.State.Transaction" --getTime :: AnyEv EpochTime getTime :: (Integral epochTime) => AnyEv epochTime getTime = sel (fromIntegral . txTime . evContext) getEventClockTime :: AnyEv ClockTime getEventClockTime = do milliSeconds <- sel (txTime . evContext) return $ TOD (fromIntegral milliSeconds) 0 -- getEventId :: AnyEv TxId getEventId :: (Integral txId) => AnyEv txId getEventId = sel (fromIntegral . txId . evContext) instance Version TxContext -- Default to version 0 $(deriveSerialize ''TxContext) instance Version StdGen instance Serialize StdGen where getCopy = contain $ liftM read safeGet putCopy = contain . safePut . show {- Durablity: * Pending queue is TChan (TxContext, ev) * Get events from the input sources in circular fashion * Dump events on disk before adding to pending queue * Checkpoints as follows: * check point event arrives from one of the input sources * write a new checkpoint file with: + list of pending transactions (all non-pending are out of system) + next txid + save state + rotare log files * resume transaction processing -} type TypeString = String #ifndef __HADDOCK__ data EventHandler where UpdateHandler :: UpdateEvent ev res => (Maybe TxContext -> ev -> IO res) -> (Object -> ev) -> EventHandler QueryHandler :: QueryEvent ev res => (ev -> IO res) -> (Object -> ev) -> EventHandler #else data EventHandler = EventHandler #endif type EventMap = M.Map TypeString EventHandler data EmitInternal = EmitInternal EventMap {-# NOINLINE emitRef #-} emitRef :: IORef EmitInternal emitRef = unsafePerformIO $ newIORef (error "HAppS not initiated") -- Low level function for emitting events. Very unsafe, do not expose. emitFunc :: (Serialize ev, Typeable res) => EventMap -> TypeString -> ev -> IO res emitFunc eventMap eventType ev = case M.lookup eventType eventMap of Nothing -> error $ "Emitted event to unknown component. Ev: " ++ eventType Just (UpdateHandler fn _) -> unsafeCoerce# fn Nothing ev Just (QueryHandler fn _) -> unsafeCoerce# fn ev -- Wrapper around the global emitter map. Very unsafe, do not expose. -- This function is only safe through 'query' and 'update'. emitEvent' :: (Serialize ev, Typeable res) => TypeString -> ev -> IO res emitEvent' eventType ev = do internal <- readIORef emitRef case internal of EmitInternal eventMap -> emitFunc eventMap eventType ev emitEvent :: (Serialize ev, Typeable res) => ev -> IO res emitEvent ev = emitEvent' (show (typeOf ev)) ev initEventMap :: (Methods st, Component st) => MVar TxControl -> Proxy st -> IO () initEventMap ctlVar componentProxy = do eventMap <- createEventMap ctlVar componentProxy writeIORef emitRef $ EmitInternal eventMap {- Events for different components can be executed in parallel. -} -- Casting the event and the result type is safe. The types are kept sane due to the -- EventUpdate and EventQuery classes. createEventMap :: (Methods st, Component st) => MVar TxControl -> Proxy st -> IO EventMap createEventMap ctlVar componentProxy = do maps <- forM (M.elems componentTree) $ \(MethodMap m) -> do tx <- createNewTxRun forkIO $ forever $ atomically $ do HR ev fn <- readTChan (txEventQueue tx) tev <- addTxId tx =<< newTxContext writeTChan (txProcessQueue tx) (IHR tev ev fn) ctl <- readMVar ctlVar runTxLoop (ctlEventSaver ctl) (txProcessQueue tx) initialValue return $ M.union (extraEvents tx) (M.map (eventHandler tx) m) return $ M.unions maps where (componentTree, _ioActions) = collectHandlers componentProxy eventHandler tx (Update fn) = let updateEmitter mbContext ev = let realEv = ev in do mv <- newEmptyMVar let handler fn = case mbContext of Nothing -> writeTChan (txEventQueue tx) (HR realEv fn) Just cxt -> do lastCxt <- readTVar (txLastTxContext tx) writeTChan (txProcessQueue tx) $ IHR cxt realEv $ if txId lastCxt < txId cxt then fn else handleUpdate (putMVar mv) $ return $ error "Forced result from dated event." atomically $ handler $ handleUpdate (putMVar mv) (fn realEv) takeMVar mv in UpdateHandler updateEmitter parseObject eventHandler tx (Query fn) = let queryEmitter ev = let realEv = ev in do mv <- newEmptyMVar quickQuery' tx $ HR realEv $ handleQuery (putMVar mv) (fn realEv) takeMVar mv in QueryHandler queryEmitter parseObject instance QueryEvent () L.ByteString instance UpdateEvent L.ByteString () extraEvents :: Serialize st => TxRun st -> EventMap extraEvents tx = M.fromList [ (getStateType stateType, getStateHandler tx) , (setNewStateType stateType, setNewStateHandler tx) ] where t :: TxRun st -> st t _ = undefined stateType = show (typeOf (t tx)) getStateHandler tx = let fn :: () -> IO L.ByteString fn () = do mv <- newEmptyMVar quickQuery' tx $ HR () $ \context st -> return (Nothing, putMVar mv (serialize (context, st))) takeMVar mv in QueryHandler fn (error "No parser for GetState event") setNewStateHandler tx = let fn :: L.ByteString -> IO () fn bs = do ((context, newState), _) <- evaluate $ deserialize bs mv <- newEmptyMVar quickQuery' tx $ HR () $ \_context _oldState -> return (Just newState, putMVar mv ()) takeMVar mv atomically $ writeTVar (txLastTxContext tx) context in UpdateHandler (const fn) (error "No parser for SetNewState event") allStateTypes :: (Methods a, Component a) => Proxy a -> [TypeString] allStateTypes proxy = let (componentTree, _ioActions) = collectHandlers proxy in M.keys componentTree componentIO :: (Methods a, Component a) => Proxy a -> [IO ()] componentIO proxy = let (_componentTree, ioActions) = collectHandlers proxy in ioActions createNewTxRun :: IO (TxRun st) createNewTxRun = atomically $ do eventQueue <- newTChan processQueue <- newTChan lastContext <- newTVar (TxContext 0 0 0 (mkStdGen 42)) return $ TxRun eventQueue processQueue lastContext setNewStateType str = "SetNewState: " ++ str getStateType str = "GetState: " ++ str setNewState :: TypeString -> L.ByteString -> IO () setNewState stateType state = emitEvent' (setNewStateType stateType) state getState :: TypeString -> IO L.ByteString getState stateType = emitEvent' (getStateType stateType) () data SetNewState st = SetNewState L.ByteString deriving (Typeable) data GetState st = GetState deriving (Typeable) instance Version (SetNewState st) instance Typeable st => Serialize (SetNewState st) where putCopy (SetNewState lbs) = contain $ Binary.put lbs getCopy = contain $ liftM SetNewState Binary.get instance Version (GetState st) instance Typeable st => Serialize (GetState st) where putCopy GetState = contain $ return () getCopy = contain $ return GetState instance Typeable st => UpdateEvent (SetNewState st) () instance Typeable st => QueryEvent (GetState st) L.ByteString -- | Schedule an update and wait for it to complete. When this function returns, you're -- guaranteed the update will be persistent. update :: (MonadIO m, UpdateEvent ev res) => ev -> m res update = liftIO . emitEvent -- | Emit a state query and wait for the result. query :: (MonadIO m, QueryEvent ev res) => ev -> m res query = liftIO . emitEvent -- Execute a query immediately without giving it a unique timestamp & transaction ID. quickQuery' :: (Serialize st) => TxRun st -> HR st -> IO () quickQuery' txrun (HR ev fun) = do now <- getEpochMilli atomically $ do tx <- readTVar (txLastTxContext txrun) writeTChan (txProcessQueue txrun) $ IHR tx{txTime=now} ev fun type Runner ev res = IO (IO ev, res -> IO ()) type EH i o = i -> IO o data Event = forall ev. Serialize ev => Event ev data IHR st = forall ev. (Serialize ev) => IHR TxContext ev (RunHandler st ev) data HR st = forall ev. (Serialize ev) => HR ev (RunHandler st ev) type RunHandler st ev = TxContext -> st -> IO (Maybe st, IO ()) data Res a = Ok a | Error Exception type EventQueue st = TChan (HR st) -- Queue of local event not yet given a TxContext. type ProcessQueue st = TChan (IHR st) -- Queue of events to be processed. TxContext'es have been asigned at this point. data TxRun st = TxRun {txEventQueue :: !(EventQueue st) ,txProcessQueue :: !(ProcessQueue st) ,txLastTxContext :: !(TVar TxContext)} type EvLoaders' st = M.Map String (ProcessQueue st -> L.ByteString -> IO (TxId,L.ByteString)) type EvLoaders = M.Map String (L.ByteString -> IO (TxId,L.ByteString)) setEvLoadersQueue :: ProcessQueue st -> EvLoaders' st -> EvLoaders setEvLoadersQueue queue = M.map (\fn -> fn queue) runColdEvent :: TxContext -> Object -> IO () runColdEvent cxt obj = do EmitInternal eventMap <- readIORef emitRef runColdEventFunc cxt obj eventMap runColdEventFunc :: TxContext -> Object -> EventMap -> IO () runColdEventFunc cxt obj eventMap = case M.lookup eventType eventMap of Nothing -> error $ "Couldn't find handler for cold event of type: " ++ eventType Just (QueryHandler run parse) -> do run (parse obj) return () Just (UpdateHandler run parse) -> do run (Just cxt) (parse obj) return () where eventType = objectType obj eventTString :: Serialize ev => ev -> TypeString eventTString ev = show (typeOf ev) handleEvent :: (st -> Env -> Ev m res -> STM intermediate) -> (st -> intermediate -> IO (Maybe st, res)) -> (res -> IO ()) -> Ev m res -> RunHandler st ev handleEvent runner stateCheck ofun action tx st = handle eh $ do intermediate <- atomically $ runQuery (newState, res) <- stateCheck st intermediate return (newState, ofun res) where runQuery = do rs <- newTVar (txStdGen tx) let env = Env { evContext = tx, evRandoms = rs } intermediate <- runner st env action return $ intermediate eh e = do logMT ERROR ("handleEvent FAIL: "++ show e) return (Nothing,ofun (throw e)) handleQuery :: (res -> IO ()) -> Query st res -> RunHandler st ev handleQuery = handleEvent (\st env (Ev cmd) -> runReaderT (cmd env) st) (\_st res -> return (Nothing, res)) handleUpdate :: (res -> IO ()) -> Update st res -> RunHandler st ev handleUpdate = handleEvent (\st env (Ev cmd) -> runStateT (cmd env) st) (\st (res,st') -> checkDiff st st' >>= \diff -> return (diff, res)) {- Some updates might not modify the state. Doing a pointer-check might be worth it. (as a side note, reallyUnsafePtrEquality# is orders of magnitude faster than comparing StableNames.) -} checkDiff :: a -> a -> IO (Maybe a) checkDiff _old new = return (Just new) processEvent :: (Serialize ev) => TxRun st -> ev -> (RunHandler st ev) -> IO () processEvent txrun ev runHandler = atomically $ writeTChan (txEventQueue txrun) $ HR ev (runHandler) getEpochMilli :: IO EpochMilli getEpochMilli = do TOD sec pico <- getClockTime return $ fromIntegral $ sec * 1000 + pico `div` 10^9 newTxContext :: STM TxContext newTxContext = unsafeIOToSTM $ do milli <- getEpochMilli let txid = -1 -- Not set yet. sgen <- modifyMVar globalRandomGen (return . split) let (rand, sgen') = random sgen return $ TxContext txid rand milli sgen' addTxId :: TxRun st -> TxContext -> STM TxContext addTxId tx context = do lastContext <- readTVar (txLastTxContext tx) let new = context{txId = txId lastContext + 1} writeTVar (txLastTxContext tx) new return new {-# NOINLINE globalRandomGen #-} -- XXX: why are we using a global StdGen? Isn't there already one in System.Random? globalRandomGen :: MVar StdGen globalRandomGen = unsafePerformIO (newMVar =<< getStdGen) data TxConfig = TxConfig { txcCheckpointSeconds :: Seconds, -- ^ Perform checkpoint at least every N seconds. txcOperationMode :: OperationMode, txcClusterSize :: Int, -- ^ Number of active nodes in the cluster (not counting this node). txcClusterPort :: Int, -- txcCommitFrequency :: Int -- ^ Commits per second. Only applies to cluster mode. } data TxControl = TxControl { ctlSaver :: Saver -- ^ Saver given by the user. , ctlEventSaver :: MVar (WriterStream EventLogEntry) , ctlAllComponents :: [String] -- ^ Types of each component used. , ctlChildren :: [(ThreadId, MVar ())] -- } data EventLogEntry = EventLogEntry TxContext Object deriving (Typeable, Show) instance Version EventLogEntry instance Serialize EventLogEntry where putCopy (EventLogEntry context obj) = contain $ safePut (context,obj) getCopy = contain $ do (context, obj) <- safeGet return $ EventLogEntry context obj data OperationMode = SingleMode | ClusterMode String nullTxConfig :: TxConfig nullTxConfig = TxConfig { txcCheckpointSeconds = 60*60*24, txcOperationMode = SingleMode, txcClusterSize = 0, txcClusterPort = 8500, txcCommitFrequency = 50 } runTxLoop :: MVar (WriterStream EventLogEntry) -> ProcessQueue st -> st -> IO () runTxLoop eventSaverVar queue st0 = let loop st = do IHR context ev fun <- atomically $ readTChan queue let tstring = eventTString ev logMT NOTICE $ ("> Event "++show (txId context)++" of "++tstring) (mst,ra) <- fun context st case mst of -- State was not updated. -- -- Thus the response can be executed immediately. Nothing -> do forkIO $ logMT NOTICE "> pure" >> ra loop st -- There is a new State. -- -- Note that saverAdd can return without yet writing the result -- as long as: -- 1) saverAdd calls honor the sequence in which they were made. -- 2) saverAdd calls execute the finalizers only after the value -- has been serialized. The finalizers typically return the -- result to the user so they should not be kept -- waiting too long. -- 3) This means that checkpoints need to flush the saver -- which will guarantee that all pending result/side-effects -- have been processed. -- 4) Savers must *not* block while running the finalizers Just st' -> do eventSaver <- readMVar eventSaverVar writerAdd eventSaver (EventLogEntry context (mkObject ev)) (logMT NOTICE "> disk " >> ra) loop st' in do forkIO $ handle excHandler $ loop st0 return () where excHandler (AsyncException ThreadKilled) = return () excHandler BlockedIndefinitely = return () excHandler e = throwIO e