module Data.CQRS.EventStore.Backend.Memory
( MemoryEventStoreBackend
, createBackendPool
) where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (TVar, newTVar, readTVar, writeTVar)
import Control.Concurrent.MSem as MSem
import Control.Monad (unless, liftM)
import Control.Monad.IO.Class (liftIO)
import qualified Data.Conduit.List as CL
import Data.Conduit.Pool (Pool, createPool)
import qualified Data.Foldable as F
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Sequence (Seq, (><))
import qualified Data.Sequence as S
import Data.CQRS.GUID
import Data.CQRS.EventStore.Backend
data MemoryEventStoreBackend = MESB
(TVar (Map GUID (Seq RawEvent)))
(MSem Int)
(TVar (Map GUID RawSnapshot))
createBackendPool :: Int -> IO (Pool MemoryEventStoreBackend)
createBackendPool n = do
events <- atomically $ newTVar M.empty
snapshots <- atomically $ newTVar M.empty
lock <- MSem.new 1
createPool (return $ MESB events lock snapshots) (\_ -> return ()) 1 1 n
instance EventStoreBackend MemoryEventStoreBackend where
esbStoreEvents (MESB eventsTVar _ _) g v0 newEvents =
atomically $ do
events <- readTVar eventsTVar
let eventsForAggregate = M.findWithDefault S.empty g events
let expectedV0 = S.length eventsForAggregate
unless (expectedV0 == v0) $ fail "Mismatched version numbers"
writeTVar eventsTVar $ M.insert g (eventsForAggregate >< S.fromList newEvents) events
esbRetrieveEvents (MESB eventsTVar _ _) g v0 = do
e <- liftIO $ atomically $ do
events <- readTVar eventsTVar
return $ F.toList $ M.findWithDefault S.empty g events
CL.sourceList $ drop v0 e
esbEnumerateAllEvents (MESB eventsTVar _ _) = do
events <- liftIO $ atomically $ readTVar eventsTVar
CL.sourceList $ concatMap F.toList $ M.elems $ events
esbWriteSnapshot (MESB _ _ v) g s =
atomically $ do
snapshots <- readTVar v
writeTVar v $ M.insert g s $ snapshots
esbGetLatestSnapshot (MESB _ _ v) g =
atomically $ do
liftM (M.lookup g) $ readTVar v
esbWithTransaction (MESB _ lock _) io =
MSem.with lock io