-- | Run transactions against event stores. module Data.CQRS.Transaction ( TransactionT , getAggregateRoot , publishEvent , retrieveEvents , runTransactionT ) where import Control.Monad (forM_, when) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Class (MonadTrans(..), lift) import Control.Monad.Trans.State (StateT, get, modify, runStateT) import Data.CQRS.Aggregate (Aggregate(..)) import Data.CQRS.Internal.AggregateRef (AggregateRef, mkAggregateRef) import qualified Data.CQRS.Internal.AggregateRef as AR import Data.CQRS.Event (Event(..)) import Data.CQRS.EventStore (EventStore, withTransaction) import qualified Data.CQRS.EventStore as ES import Data.CQRS.GUID (GUID) import Data.Default (Default(..)) import Data.List (find) import Data.Typeable (Typeable, cast) -- | Transaction monad transformer. newtype TransactionT e m a = TransactionT (Transaction e m a) deriving (Functor, Monad) instance MonadTrans (TransactionT e) where lift m = TransactionT $ lift m -- Existential wrapper for AggregateRef. data BoxedAggregateRef e = forall a . (Typeable a, Typeable e, Event e a, Default a, Aggregate a) => BoxedAggregateRef (AggregateRef a e) -- | Transaction monad itself. type Transaction e = StateT (TransactionState e) data TransactionState e = TransactionState { eventStore :: EventStore , aggregateRefsToCommit :: [BoxedAggregateRef e] } -- | Run transaction against an event store. runTransactionT :: (Typeable a, Typeable e, Event e a, Default a, Aggregate a) => EventStore -> TransactionT e IO c -> IO c runTransactionT eventStore_ (TransactionT transaction) = do withTransaction eventStore_ $ do -- Run the computation. (r,s) <- runStateT transaction s0 -- Write out all the accumulated aggregates forM_ (aggregateRefsToCommit s) $ \(BoxedAggregateRef a) -> do es <- AR.readEvents a ES.storeEvents eventStore_ (AR.arGUID a) (AR.arStartVersion a) (map encodeEvent es) -- If we've advanced N events past the last snapshot, we -- create a new snapshot. v <- AR.getCurrentVersion a when (v - AR.arSnapshotVersion a > 10) $ do av <- AR.readValue a ES.writeSnapshot eventStore_ (AR.arGUID a) (v, encodeAggregate av) -- Return the value. return r where s0 = TransactionState eventStore_ [ ] -- | Get an aggregate ref by GUID. getById :: forall a e . (Typeable a, Typeable e, Event e a, Default a, Aggregate a) => GUID a -> TransactionT e IO (AggregateRef a e) getById guid = TransactionT $ do -- Check through list to see if we've given out a reference to the aggregate before. aggregateRefs <- fmap aggregateRefsToCommit get case find (\(BoxedAggregateRef a) -> case cast $ AR.arGUID a of Just (guid' :: GUID a) -> guid == guid' Nothing -> False) aggregateRefs of Just (BoxedAggregateRef a) -> case cast a of Just (a' :: AggregateRef a e) -> return a' Nothing -> -- This cast could only really fail if there are duplicate GUIDs for -- different types of aggregates/events. fail $ concat ["Duplicate GUID ", show guid, "!" ] Nothing -> do getByIdFromEventStore guid -- Get the latest snapshot from database, filling in a default -- if a) no snapshot exists, or b) snapshot state was not decodable. getLatestSnapshot :: forall a e . (Typeable a, Typeable e, Event e a, Default a, Aggregate a) => GUID a -> Transaction e IO (Int,a) getLatestSnapshot guid = do es <- fmap eventStore get r <- liftIO $ ES.getLatestSnapshot es guid case r of Just (v',a') -> do case decodeAggregate a' :: Maybe a of Just a'' -> return (v', a'') Nothing -> return (0, def) Nothing -> do return (0, def) -- Retrieve aggregate from event store. getByIdFromEventStore :: forall a e . (Typeable a, Typeable e, Event e a, Default a, Aggregate a) => GUID a -> Transaction e IO (AggregateRef a e) getByIdFromEventStore guid = do es <- fmap eventStore get -- Get latest snapshot (if any). (v0,a0) <- getLatestSnapshot guid -- Get events. (latestVersion, events) <- lift $ ES.retrieveEvents es guid v0 -- Build the aggregate state from all the events. let a = foldr applyEvent a0 $ map (\e -> decodeEvent e :: e) events -- Make the aggregate itself (a' :: AggregateRef a e) <- lift $ mkAggregateRef a guid latestVersion v0 -- Add to set of aggregates to commit later. modify $ \s -> s { aggregateRefsToCommit = (BoxedAggregateRef a' : aggregateRefsToCommit s) } -- Return the aggregate. return a' -- | Publish event for an aggregate root. publishEvent :: (MonadIO m, Event e a) => AggregateRef a e -> e -> TransactionT e m () publishEvent aggregateRef event = lift $ AR.publishEvent aggregateRef event -- | Retrieve a range of events, invoking a callback function for each event. -- For example, -- -- @ -- retrieveEvents 10 20 $ \\e -> do -- putStrLn $ \"Event: \" ++ show e -- @ -- -- would print all events between "logical time" 20 (inclusive) -- and "logical time" 30 (not inclusive). "Logical time" starts -- at 1. It is NOT an error to provide bounds for which there -- are no events. retrieveEvents :: forall e a . (Event e a) => Int -> Int -> (e -> IO ()) -> TransactionT e IO () retrieveEvents minVersion maxVersion h = TransactionT $ do es <- fmap eventStore get lift $ ES.readAllEvents es minVersion maxVersion $ \ed -> do let e :: e = decodeEvent ed liftIO $ h e -- | Get aggregate root. getAggregateRoot :: (Default a, Event e a, Typeable a, Typeable e, Aggregate a) => GUID a -> TransactionT e IO (AggregateRef a e, a) getAggregateRoot guid = do aggregateRef <- getById guid aggregate <- lift $ AR.readValue aggregateRef return (aggregateRef, aggregate)