module Data.CQRS.Command
(
AggregateAction
, Repository
, CommandT
, createAggregate
, execCommandT
, freshUUID
, publishEvent
, readAggregate
, runCommandT
, UnitOfWorkT
, updateAggregate
) where
import Control.DeepSeq (NFData)
import Control.Monad (forM, join, liftM, void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (MonadTrans(..), lift)
import Control.Monad.Trans.State.Strict (StateT, runStateT, get, modify')
import Control.Monad.Trans.Reader (ReaderT, runReaderT, ask)
import Data.CQRS.Internal.Aggregate (Aggregate)
import qualified Data.CQRS.Internal.Aggregate as A
import Data.CQRS.Internal.Repository
import Data.CQRS.Types.AggregateAction (AggregateAction)
import Data.CQRS.Types.EventStore (EventStore(..))
import Data.CQRS.Types.PersistedEvent (PersistedEvent(..))
import Data.CQRS.Types.Snapshot (Snapshot(..))
import Data.CQRS.Types.SnapshotStore
import Data.Foldable (forM_)
import Data.Maybe (fromJust)
import Data.UUID.Types (UUID)
import qualified System.IO.Streams.Combinators as SC
newtype CommandT a e m b = CommandT { unCommandT :: CommandM a e m b }
deriving (Functor, Applicative, Monad)
instance MonadTrans (CommandT a e) where
lift m = CommandT $ lift m
instance MonadIO m => MonadIO (CommandT a e m) where
liftIO m = CommandT $ liftIO m
type CommandM a e = ReaderT (Command a e)
data Command a e =
Command { commandRepository :: Repository a e
}
newtype UnitOfWorkT a e m b = UnitOfWorkT { unUnitOfWorkT :: UnitOfWorkM a e m b }
deriving (Functor, Applicative, Monad)
instance MonadTrans (UnitOfWorkT a e) where
lift m = UnitOfWorkT $ lift m
instance MonadIO m => MonadIO (UnitOfWorkT a e m) where
liftIO m = UnitOfWorkT $ liftIO m
type UnitOfWorkM a e = StateT (UnitOfWork a e)
data UnitOfWork a e =
UnitOfWork { uowAggregate :: Aggregate a e
}
runCommandT :: (MonadIO m) => Repository a e -> CommandT a e m b -> m b
runCommandT repository (CommandT command) = runReaderT command $ Command repository
execCommandT :: (MonadIO m) => Repository a e -> CommandT a e m b -> m ()
execCommandT repository = void . runCommandT repository
freshUUID :: (MonadIO m) => CommandT a e m UUID
freshUUID = CommandT $ do
repository <- liftM commandRepository ask
liftIO $ repositoryUUIDSupply repository
writeChanges :: (MonadIO m) => UUID -> Aggregate a e -> CommandT a e m ()
writeChanges aggregateId aggregate = CommandT $ do
repository <- liftM commandRepository ask
let snapshotStore = repositorySnapshotStore repository
let eventStore = repositoryEventStore repository
let publishEvents = repositoryPublishEvents repository
versionedEvents <- forM (A.versionedEvents aggregate) $ \(v, e) -> do
i <- liftIO $ repositoryUUIDSupply repository
return $ PersistedEvent e v i
when (length versionedEvents > 0) $ do
liftIO $ (esStoreEvents eventStore) aggregateId versionedEvents
liftIO $ publishEvents (aggregateId, versionedEvents)
forM_ (settingsSnapshotFrequency $ repositorySettings repository) $ \snapshotFrequency -> do
forM_ (snapshotForAggregate snapshotFrequency aggregate) $ \snapshot -> do
liftIO $ ssWriteSnapshot snapshotStore aggregateId snapshot
where
snapshotForAggregate maxDelta aggregate = join $ (flip fmap) (A.aggregateSnapshot aggregate) $ \(v, a) ->
let sv = A.aggregateSnapshotVersion aggregate in
if (v sv > maxDelta) then
Just $ Snapshot v a
else
Nothing
getAggregateAction :: (Monad m) => CommandT a e m (AggregateAction a e)
getAggregateAction = CommandT $ liftM (repositoryAggregateAction . commandRepository) ask
createAggregate :: (MonadIO m, Monad m) => UUID -> (UnitOfWorkT a e (CommandT a e m) (Maybe a) -> UnitOfWorkT a e (CommandT a e m) b) -> CommandT a e m b
createAggregate aggregateId unitOfWork = do
(r, s) <- do
aggregateAction <- getAggregateAction
runStateT run $ UnitOfWork $ A.emptyAggregate aggregateAction
writeChanges aggregateId $ uowAggregate s
CommandT $ return r
where
run = do
let getter = liftM (A.aggregateValue . uowAggregate) get
unUnitOfWorkT $ unitOfWork $ UnitOfWorkT getter
updateAggregate :: (MonadIO m) => UUID -> (UnitOfWorkT a e (CommandT a e m) a -> UnitOfWorkT a e (CommandT a e m) b) -> CommandT a e m (Maybe b)
updateAggregate aggregateId unitOfWork = CommandT $ do
aggregate <- getByIdFromEventStore aggregateId
unCommandT $ case A.aggregateValue $ aggregate of
Nothing -> do
return Nothing
Just _ -> do
(r, s) <- runStateT run $ UnitOfWork aggregate
writeChanges aggregateId $ uowAggregate s
return $ Just r
where
run = do
let getter = liftM (fromJust . A.aggregateValue . uowAggregate) get
unUnitOfWorkT $ unitOfWork $ UnitOfWorkT getter
readAggregate :: (MonadIO m) => UUID -> CommandT a e m (Maybe a)
readAggregate = (flip updateAggregate) id
getByIdFromEventStore :: (MonadIO m) => UUID -> CommandM a e m (Aggregate a e)
getByIdFromEventStore aggregateId = do
r <- liftM commandRepository ask
let es = repositoryEventStore r
let ss = repositorySnapshotStore r
let aa = repositoryAggregateAction r
a' <- liftM (A.applySnapshot $ A.emptyAggregate aa) $ liftIO $ ssReadSnapshot ss $ aggregateId
a'' <- liftIO $ esRetrieveEvents es aggregateId (A.aggregateVersion0 a') (SC.fold A.applyEvent a')
return a''
publishEvent :: (MonadIO m, NFData a, NFData e) => e -> UnitOfWorkT a e (CommandT a e m) ()
publishEvent event = UnitOfWorkT $ do
modify' (\s -> s { uowAggregate = A.publishEvent (uowAggregate s) event })