module Data.CQRS.Internal.Repository
( Repository(..)
, Settings(..)
, defaultSettings
, enumerateEventStore
, enumerateAndStreamEvents
, newRepository
, withEventStoreBackend
) where
import Control.DeepSeq (NFData)
import Control.Monad (forever)
import Control.Monad.Trans.Class (lift)
import Data.Conduit (ResourceT, Source, addCleanup, bracketP, yield)
import Data.Conduit.Pool (Pool, withResource, takeResource, mrReuse, mrValue)
import Data.CQRS.Internal.EventBus
import qualified Data.CQRS.Internal.EventStore as ES
import Data.CQRS.Internal.EventStore (EventStore(..))
import Data.CQRS.EventStore.Backend
import Data.CQRS.PersistedEvent
import Data.CQRS.Serializable
data Settings = Settings
{ settingsSnapshotFrequency :: Maybe Int
}
defaultSettings :: Settings
defaultSettings = Settings
{ settingsSnapshotFrequency = Nothing
}
data Repository e b = Repository
{ repositoryEventStoreBackendPool :: Pool b
, repositoryEventBus :: EventBus e
, repositorySettings :: Settings
}
newRepository :: (EventStoreBackend b, Show e, NFData e) => Settings -> Pool b -> IO (Repository e b)
newRepository settings eventStoreBackendPool = do
eventBus <- newEventBus
return $ Repository { repositoryEventStoreBackendPool = eventStoreBackendPool
, repositoryEventBus = eventBus
, repositorySettings = settings
}
withEventStoreBackend :: (EventStoreBackend b) => Repository e b -> (b -> IO a) -> IO a
withEventStoreBackend (Repository p _ _) action =
withResource p (\eventStoreBackend -> action eventStoreBackend)
enumerateEventStore :: (Serializable e, EventStoreBackend b) => Repository e b -> Source (ResourceT IO) [PersistedEvent e]
enumerateEventStore (Repository p _ _) = do
eventStoreBackend <- lift $ takeResource p
addCleanup (mrReuse eventStoreBackend) $ ES.enumerateEventStore (EventStore $ mrValue eventStoreBackend)
data StreamingState e = Buffering [PersistedEvent e]
| EnumerationComplete [PersistedEvent e]
| Streaming
deriving (Show)
enumerateAndStreamEvents :: (Show e, Serializable e, EventStoreBackend b) => Repository e b -> Source (ResourceT IO) [PersistedEvent e]
enumerateAndStreamEvents repository@(Repository _ eventBus _) = do
bracketP (subscribeToEventBus eventBus)
(unsubscribeFromEventBus eventBus)
(\subscription -> do
enumerateEventStore repository
subscriptionSource subscription)
where
subscriptionSource subscription = forever $ do
e <- lift $ lift $ readEventFromSubscription subscription
yield e