module Data.CQRS.Internal.AggregateRef ( AggregateRef , arGUID , arSnapshotVersion , arStartVersion , getCurrentVersion , mkAggregateRef , publishEvent , readEvents , readValue ) where import Control.DeepSeq (NFData, ($!!)) import Control.Monad (liftM) import Control.Monad.IO.Class (MonadIO, liftIO) import Data.CQRS.Eventable (Eventable(..)) import Data.CQRS.GUID (GUID) import Data.CQRS.PersistedEvent (PersistedEvent(..)) import Data.Foldable (toList) import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef) import Data.Sequence (Seq, (|>)) import qualified Data.Sequence as S import Data.Typeable (Typeable) -- | Aggregate root reference. data AggregateRef a e = AggregateRef { arValue :: IORef (Maybe a) , arEvents :: IORef (Seq (PersistedEvent e)) , arGUID :: !GUID , arStartVersion :: !Int , arSnapshotVersion :: !Int } deriving (Typeable) -- | Make aggregate mkAggregateRef :: (MonadIO m) => Maybe a -> GUID -> Int -> Int -> m (AggregateRef a e) mkAggregateRef a guid originatingVersion snapshotVersion = do a' <- liftIO $ newIORef a e' <- liftIO $ newIORef S.empty return $ AggregateRef a' e' guid originatingVersion snapshotVersion -- | Publish event to aggregate. publishEvent :: (MonadIO m, Eventable a e, NFData e, NFData a) => AggregateRef a e -> e -> m () publishEvent aggregateRef event = liftIO $ do -- Apply event to aggregate state. a <- readIORef $ arValue aggregateRef let a' = applyEvent a $!! event writeIORef (arValue aggregateRef) $!! a' -- Add event to aggregate. modifyIORef' (arEvents aggregateRef) $ \events -> do (events |>) $!! (PersistedEvent (arGUID aggregateRef) event (arStartVersion aggregateRef + 1 + S.length events)) -- | Read aggregate events. readEvents :: (MonadIO m) => AggregateRef a e -> m [PersistedEvent e] readEvents = liftM toList . liftIO . readIORef . arEvents -- | Read aggregate state. readValue :: (MonadIO m) => AggregateRef a e -> m (Maybe a) readValue = liftIO . readIORef . arValue -- | Get the current version of the aggregate in aggregate ref. getCurrentVersion :: (MonadIO m) => AggregateRef a e -> m Int getCurrentVersion a = do nevs <- liftM S.length $ liftIO $ readIORef $ arEvents a return $ nevs + (arStartVersion a)