module Data.CQRS.Transaction
( TransactionT
, getAggregateRoot
, publishEvent
, retrieveEvents
, runTransactionT
) where
import Control.Monad (forM_, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (MonadTrans(..), lift)
import Control.Monad.Trans.State (StateT, get, modify, runStateT)
import Data.CQRS.Aggregate (Aggregate(..))
import Data.CQRS.Internal.AggregateRef (AggregateRef, mkAggregateRef)
import qualified Data.CQRS.Internal.AggregateRef as AR
import Data.CQRS.Event (Event(..))
import Data.CQRS.Eventable (Eventable(..))
import Data.CQRS.EventStore (EventStore, withTransaction)
import qualified Data.CQRS.EventStore as ES
import Data.CQRS.GUID (GUID)
import Data.Default (Default(..))
import Data.List (find)
import Data.Typeable (Typeable, cast)
newtype TransactionT e m a = TransactionT (Transaction e m a)
deriving (Functor, Monad)
instance MonadTrans (TransactionT e) where
lift m = TransactionT $ lift m
data BoxedAggregateRef e =
forall a . (Typeable a, Typeable e, Event e, Default a, Aggregate a, Eventable a e) => BoxedAggregateRef (AggregateRef a e)
type Transaction e = StateT (TransactionState e)
data TransactionState e =
TransactionState { eventStore :: EventStore
, aggregateRefsToCommit :: [BoxedAggregateRef e]
}
runTransactionT :: (Typeable e, Event e) => EventStore -> TransactionT e IO c -> IO c
runTransactionT eventStore_ (TransactionT transaction) = do
withTransaction eventStore_ $ do
(r,s) <- runStateT transaction s0
forM_ (aggregateRefsToCommit s) $ \(BoxedAggregateRef a) -> do
es <- AR.readEvents a
ES.storeEvents eventStore_ (AR.arGUID a) (AR.arStartVersion a)
(map encodeEvent es)
v <- AR.getCurrentVersion a
when (v AR.arSnapshotVersion a > 10) $ do
av <- AR.readValue a
ES.writeSnapshot eventStore_ (AR.arGUID a) (v, encodeAggregate av)
return r
where s0 = TransactionState eventStore_ [ ]
getById :: forall a e . (Typeable a, Typeable e, Event e, Default a, Aggregate a, Eventable a e) => GUID a -> TransactionT e IO (AggregateRef a e)
getById guid = TransactionT $ do
aggregateRefs <- fmap aggregateRefsToCommit get
case find (\(BoxedAggregateRef a) ->
case cast $ AR.arGUID a of
Just (guid' :: GUID a) -> guid == guid'
Nothing -> False) aggregateRefs of
Just (BoxedAggregateRef a) ->
case cast a of
Just (a' :: AggregateRef a e) -> return a'
Nothing ->
fail $ concat ["Duplicate GUID ", show guid, "!" ]
Nothing -> do
getByIdFromEventStore guid
getLatestSnapshot :: forall a e . (Typeable a, Typeable e, Event e, Default a, Aggregate a) => GUID a -> Transaction e IO (Int,a)
getLatestSnapshot guid = do
es <- fmap eventStore get
r <- liftIO $ ES.getLatestSnapshot es guid
case r of
Just (v',a') -> do
case decodeAggregate a' :: Maybe a of
Just a'' -> return (v', a'')
Nothing -> return (0, def)
Nothing -> do
return (0, def)
getByIdFromEventStore :: forall a e . (Typeable a, Typeable e, Event e, Default a, Aggregate a, Eventable a e) => GUID a -> Transaction e IO (AggregateRef a e)
getByIdFromEventStore guid = do
es <- fmap eventStore get
(v0,a0) <- getLatestSnapshot guid
(latestVersion, events) <- lift $ ES.retrieveEvents es guid v0
let a = foldr applyEvent a0 $ map (\e -> decodeEvent e :: e) events
(a' :: AggregateRef a e) <- lift $ mkAggregateRef a guid latestVersion v0
modify $ \s -> s { aggregateRefsToCommit = (BoxedAggregateRef a' : aggregateRefsToCommit s) }
return a'
publishEvent :: (MonadIO m, Event e, Eventable a e) => AggregateRef a e -> e -> TransactionT e m ()
publishEvent aggregateRef event = lift $ AR.publishEvent aggregateRef event
retrieveEvents :: forall e . (Event e) => Int -> Int -> (e -> IO ()) -> TransactionT e IO ()
retrieveEvents minVersion maxVersion h = TransactionT $ do
es <- fmap eventStore get
lift $ ES.readAllEvents es minVersion maxVersion $ \ed -> do
let e :: e = decodeEvent ed
liftIO $ h e
getAggregateRoot :: (Default a, Event e, Typeable a, Typeable e, Aggregate a, Eventable a e) => GUID a -> TransactionT e IO (AggregateRef a e, a)
getAggregateRoot guid = do
aggregateRef <- getById guid
aggregate <- lift $ AR.readValue aggregateRef
return (aggregateRef, aggregate)