-- | Memory-based event store backend. Used primarily -- for testing. module Data.CQRS.EventStore.Backend.Memory ( MemoryEventStoreBackend , createBackendPool ) where -- External imports import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TVar (TVar, newTVar, readTVar, writeTVar) import Control.Concurrent.MSem as MSem import Control.Monad (unless, liftM) import Control.Monad.IO.Class (liftIO) import qualified Data.Conduit.List as CL import Data.Conduit.Pool (Pool, createPool) import qualified Data.Foldable as F import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Sequence (Seq, (><)) import qualified Data.Sequence as S -- Internal imports import Data.CQRS.GUID import Data.CQRS.EventStore.Backend -- | Memory-based event store backend. data MemoryEventStoreBackend = MESB (TVar (Map GUID (Seq RawEvent))) (MSem Int) (TVar (Map GUID RawSnapshot)) -- | Pool of memory event store backends. createBackendPool :: Int -> IO (Pool MemoryEventStoreBackend) createBackendPool n = do -- New "backing store" for every pool. events <- atomically $ newTVar M.empty snapshots <- atomically $ newTVar M.empty lock <- MSem.new 1 -- Create the pool createPool (return $ MESB events lock snapshots) (\_ -> return ()) 1 1 n -- Instance instance EventStoreBackend MemoryEventStoreBackend where esbStoreEvents (MESB eventsTVar _ _) g v0 newEvents = atomically $ do events <- readTVar eventsTVar let eventsForAggregate = M.findWithDefault S.empty g events let expectedV0 = S.length eventsForAggregate unless (expectedV0 == v0) $ fail "Mismatched version numbers" writeTVar eventsTVar $ M.insert g (eventsForAggregate >< S.fromList newEvents) events esbRetrieveEvents (MESB eventsTVar _ _) g v0 = do e <- liftIO $ atomically $ do events <- readTVar eventsTVar return $ F.toList $ M.findWithDefault S.empty g events CL.sourceList $ drop v0 e esbEnumerateAllEvents (MESB eventsTVar _ _) = do events <- liftIO $ atomically $ readTVar eventsTVar CL.sourceList $ concatMap F.toList $ M.elems $ events esbWriteSnapshot (MESB _ _ v) g s = atomically $ do snapshots <- readTVar v writeTVar v $ M.insert g s $ snapshots esbGetLatestSnapshot (MESB _ _ v) g = atomically $ do liftM (M.lookup g) $ readTVar v esbWithTransaction (MESB _ lock _) io = MSem.with lock io