module Data.CQRS.Internal.UnitOfWork
( UnitOfWorkT
, createAggregate
, createOrLoadAggregate
, findAggregate
, loadAggregate
, publishEvent
, runUnitOfWorkT
) where
import Control.DeepSeq (NFData)
import Control.Monad (forM, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (MonadTrans(..), lift)
import Control.Monad.Trans.State (StateT, get, modify, runStateT)
import Data.Conduit (($$), runResourceT)
import qualified Data.Conduit.List as CL
import Data.Foldable (forM_)
import Data.List (find)
import Data.Typeable (Typeable, cast)
import Data.CQRS.Aggregate
import Data.CQRS.Eventable (Eventable(..))
import Data.CQRS.Internal.EventBus
import Data.CQRS.EventStore.Backend (EventStoreBackend(..), RawSnapshot(..))
import Data.CQRS.GUID (GUID)
import Data.CQRS.Internal.AggregateRef (AggregateRef, mkAggregateRef)
import qualified Data.CQRS.Internal.AggregateRef as AR
import Data.CQRS.Internal.EventStore (EventStore(..))
import qualified Data.CQRS.Internal.EventStore as ES
import Data.CQRS.Internal.Repository
import Data.CQRS.PersistedEvent (PersistedEvent(..))
import Data.CQRS.Serializable
newtype UnitOfWorkT e m a = UnitOfWorkT (UnitOfWorkM e m a)
deriving (Functor, Monad)
instance MonadTrans (UnitOfWorkT e) where
lift m = UnitOfWorkT $ lift m
data BoxedAggregateRef e =
forall a . (Typeable a, Typeable e, Serializable e, Aggregate a, Eventable a e) => BoxedAggregateRef (AggregateRef a e)
data BoxedEventStore e =
forall b . (EventStoreBackend b) => BoxedEventStore (EventStore e b)
data BoxedEventStoreBackend =
forall b . (EventStoreBackend b) => BoxedEventStoreBackend b
type UnitOfWorkM e = StateT (UnitOfWork e)
data UnitOfWork e =
UnitOfWork { txnEventStore :: BoxedEventStore e
, txnEventStoreBackend :: BoxedEventStoreBackend
, aggregateRefsToCommit :: [BoxedAggregateRef e]
}
runUnitOfWorkT :: forall b c e . (Typeable e, Serializable e, EventStoreBackend b) => Repository e b -> UnitOfWorkT e IO c -> IO c
runUnitOfWorkT repository (UnitOfWorkT transaction) = do
(r, writtenEvents) <- withEventStoreBackend repository $ \eventStoreBackend -> do
esbWithTransaction eventStoreBackend $ do
let eventStore = EventStore eventStoreBackend
(r,s) <- runStateT transaction $ UnitOfWork (BoxedEventStore eventStore) (BoxedEventStoreBackend eventStoreBackend) []
writtenEvents <- forM (aggregateRefsToCommit s) $ \(BoxedAggregateRef a) -> do
evs <- AR.readEvents a
ES.storeEvents eventStore (AR.arGUID a) (AR.arStartVersion a) evs
forM_ (settingsSnapshotFrequency $ repositorySettings repository) $ \f -> do
v <- AR.getCurrentVersion a
when (v AR.arSnapshotVersion a > f) $ do
mav <- AR.readValue a
case mav of
Just av ->
esbWriteSnapshot eventStoreBackend (AR.arGUID a) $ RawSnapshot v $ serialize av
Nothing ->
return ()
return $ evs
return (r, concat writtenEvents)
publishEventsToBus (repositoryEventBus repository) writtenEvents
return r
getById :: forall a e . (Typeable a, Typeable e, Serializable e, Aggregate a, Eventable a e) => GUID -> UnitOfWorkT e IO (AggregateRef a e)
getById guid = UnitOfWorkT $ do
aggregateRefs <- fmap aggregateRefsToCommit get
case find (\(BoxedAggregateRef a) -> AR.arGUID a == guid) aggregateRefs of
Just (BoxedAggregateRef a) ->
case cast a of
Just (a' :: AggregateRef a e) -> return a'
Nothing ->
fail $ concat ["Duplicate GUID ", show guid, "!" ]
Nothing -> do
getByIdFromEventStore guid
getLatestSnapshot :: forall a e . (Typeable a, Typeable e, Serializable e, Aggregate a) => GUID -> UnitOfWorkM e IO (Int, Maybe a)
getLatestSnapshot guid = do
(BoxedEventStoreBackend eventStoreBackend) <- fmap txnEventStoreBackend get
r <- liftIO $ esbGetLatestSnapshot eventStoreBackend guid
case r of
Just (RawSnapshot v a) -> do
case deserialize a :: Maybe a of
Just a' -> return (v, Just a')
Nothing -> return (0, Nothing)
Nothing -> do
return (0, Nothing)
getByIdFromEventStore :: forall a e . (Typeable a, Typeable e, Serializable e, Aggregate a, Eventable a e) => GUID -> UnitOfWorkM e IO (AggregateRef a e)
getByIdFromEventStore guid = do
(BoxedEventStore es) <- fmap txnEventStore get
(v0,ma0) <- getLatestSnapshot guid
events <- lift $ runResourceT $ (ES.retrieveEvents es guid v0 $$ CL.consume)
let latestVersion = maximum $ (:) v0 (map peSequenceNumber events)
let a = foldl (\a0 e -> applyEvent a0 $ peEvent e) ma0 events
(a' :: AggregateRef a e) <- lift $ mkAggregateRef a guid latestVersion v0
modify $ \s -> s { aggregateRefsToCommit = (BoxedAggregateRef a' : aggregateRefsToCommit s) }
return $ a'
publishEvent :: (MonadIO m, Serializable e, Typeable a, Typeable e, Aggregate a, Eventable a e, NFData a, NFData e) => AggregateRef a e -> e -> UnitOfWorkT e m ()
publishEvent aggregateRef event = UnitOfWorkT $ do
lift $ AR.publishEvent aggregateRef event
findAggregate :: (Serializable e, Typeable a, Typeable e, Aggregate a, Eventable a e) => GUID -> UnitOfWorkT e IO (Maybe (AggregateRef a e, a))
findAggregate guid = do
aggregateRef <- getById guid
aggregate <- lift $ AR.readValue aggregateRef
case aggregate of
Nothing -> return Nothing
Just a -> return $ Just (aggregateRef, a)
loadAggregate :: (Serializable e, Typeable a, Typeable e, Aggregate a, Eventable a e) => GUID -> UnitOfWorkT e IO (AggregateRef a e, a)
loadAggregate guid = do
mAggregate <- findAggregate guid
case mAggregate of
Nothing -> fail $ show $ "Aggregate with GUID " ++ show guid ++ " does not exist"
Just a -> return a
createAggregate :: (Serializable e, Typeable a, Typeable e, Aggregate a, Eventable a e) => GUID -> UnitOfWorkT e IO (AggregateRef a e)
createAggregate guid = do
aggregateRef <- getById guid
aggregate <- lift $ AR.readValue aggregateRef
case aggregate of
Nothing -> do
return aggregateRef
Just _ -> fail $ show $ "Aggregate with GUID " ++ show guid ++ " already exists"
createOrLoadAggregate :: (Serializable e, Typeable a, Typeable e, Aggregate a, Eventable a e) => GUID -> UnitOfWorkT e IO (AggregateRef a e)
createOrLoadAggregate guid = getById guid