module Data.CQRS.Internal.Repository ( Repository(..) , Settings(..) , defaultSettings , enumerateEventStore , enumerateAndStreamEvents , newRepository , withEventStoreBackend ) where -- External imports import Control.DeepSeq (NFData) import Control.Monad (forever) import Control.Monad.Trans.Class (lift) import Data.Conduit (ResourceT, Source, addCleanup, bracketP, yield) import Data.Conduit.Pool (Pool, withResource, takeResource, mrReuse, mrValue) -- Internal imports import Data.CQRS.Internal.EventBus import qualified Data.CQRS.Internal.EventStore as ES import Data.CQRS.Internal.EventStore (EventStore(..)) import Data.CQRS.EventStore.Backend import Data.CQRS.PersistedEvent import Data.CQRS.Serializable -- | Repository settings data Settings = Settings { settingsSnapshotFrequency :: Maybe Int } -- | Default repository settings. defaultSettings :: Settings defaultSettings = Settings { settingsSnapshotFrequency = Nothing } -- | Repository consisting of an event store and an event bus. data Repository e b = Repository { repositoryEventStoreBackendPool :: Pool b -- ^ Event store backend pool , repositoryEventBus :: EventBus e -- ^ Event bus to use , repositorySettings :: Settings -- ^ Repository settings } -- | Create a repository from a pool of event store backends. newRepository :: (EventStoreBackend b, Show e, NFData e) => Settings -> Pool b -> IO (Repository e b) newRepository settings eventStoreBackendPool = do eventBus <- newEventBus return $ Repository { repositoryEventStoreBackendPool = eventStoreBackendPool , repositoryEventBus = eventBus , repositorySettings = settings } -- | Use event store backed from repository to perform an action. withEventStoreBackend :: (EventStoreBackend b) => Repository e b -> (b -> IO a) -> IO a withEventStoreBackend (Repository p _ _) action = withResource p (\eventStoreBackend -> action eventStoreBackend) -- | Enumerate all events which satisfy certain criteria from event -- store associated with repository. enumerateEventStore :: (Serializable e, EventStoreBackend b) => Repository e b -> Source (ResourceT IO) [PersistedEvent e] enumerateEventStore (Repository p _ _) = do eventStoreBackend <- lift $ takeResource p addCleanup (mrReuse eventStoreBackend) $ ES.enumerateEventStore (EventStore $ mrValue eventStoreBackend) -- State for the streaming thread. data StreamingState e = Buffering [PersistedEvent e] | EnumerationComplete [PersistedEvent e] | Streaming deriving (Show) -- | Enumerate all events which satisfy criteria and stream -- all new events from repository. All events which arrive -- while enumerating are buffered until enumeration completes. enumerateAndStreamEvents :: (Show e, Serializable e, EventStoreBackend b) => Repository e b -> Source (ResourceT IO) [PersistedEvent e] enumerateAndStreamEvents repository@(Repository _ eventBus _) = do bracketP (subscribeToEventBus eventBus) (unsubscribeFromEventBus eventBus) (\subscription -> do enumerateEventStore repository subscriptionSource subscription) where subscriptionSource subscription = forever $ do e <- lift $ lift $ readEventFromSubscription subscription yield e