module Eventful.ProjectionCache.Types
( ProjectionCache (..)
, VersionedProjectionCache
, GlobalStreamProjectionCache
, runProjectionCacheUsing
, serializedProjectionCache
, getLatestVersionedProjectionWithCache
, getLatestGlobalProjectionWithCache
, updateProjectionCache
, updateGlobalProjectionCache
) where
import Eventful.Projection
import Eventful.Serializer
import Eventful.Store.Class
import Eventful.UUID
data ProjectionCache key position serialized m
= ProjectionCache
{ storeProjectionSnapshot :: key -> position -> serialized -> m ()
, loadProjectionSnapshot :: key -> m (Maybe (position, serialized))
}
type VersionedProjectionCache serialized m = ProjectionCache UUID EventVersion serialized m
type GlobalStreamProjectionCache key serialized m = ProjectionCache key SequenceNumber serialized m
runProjectionCacheUsing
:: (Monad m, Monad mstore)
=> (forall a. mstore a -> m a)
-> ProjectionCache key position serialized mstore
-> ProjectionCache key position serialized m
runProjectionCacheUsing runCache ProjectionCache{..} =
ProjectionCache
{ storeProjectionSnapshot = \uuid version state -> runCache $ storeProjectionSnapshot uuid version state
, loadProjectionSnapshot = runCache . loadProjectionSnapshot
}
serializedProjectionCache
:: (Monad m)
=> Serializer state serialized
-> ProjectionCache key position serialized m
-> ProjectionCache key position state m
serializedProjectionCache Serializer{..} ProjectionCache{..} =
ProjectionCache storeProjectionSnapshot' loadProjectionSnapshot'
where
storeProjectionSnapshot' uuid version = storeProjectionSnapshot uuid version . serialize
loadProjectionSnapshot' uuid = do
mState <- loadProjectionSnapshot uuid
return $ mState >>= traverse deserialize
getLatestVersionedProjectionWithCache
:: (Monad m)
=> VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
getLatestVersionedProjectionWithCache store cache projection =
getLatestProjectionWithCache' cache projection (streamProjectionKey projection) >>= getLatestStreamProjection store
getLatestGlobalProjectionWithCache
:: (Monad m)
=> GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
getLatestGlobalProjectionWithCache store cache projection key =
getLatestProjectionWithCache' cache projection key >>= getLatestStreamProjection store
getLatestProjectionWithCache'
:: (Monad m, Ord position)
=> ProjectionCache key position state m
-> StreamProjection projKey position state event
-> key
-> m (StreamProjection projKey position state event)
getLatestProjectionWithCache' cache projection key = do
mLatestState <- loadProjectionSnapshot cache key
let
mkProjection' (position, state) =
if position > streamProjectionPosition projection
then
projection
{ streamProjectionPosition = position
, streamProjectionState = state
}
else projection
return $ maybe projection mkProjection' mLatestState
updateProjectionCache
:: (Monad m)
=> VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m ()
updateProjectionCache reader cache projection = do
StreamProjection{..} <- getLatestVersionedProjectionWithCache reader cache projection
storeProjectionSnapshot cache streamProjectionKey streamProjectionPosition streamProjectionState
updateGlobalProjectionCache
:: (Monad m)
=> GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m ()
updateGlobalProjectionCache reader cache projection key = do
StreamProjection{..} <- getLatestGlobalProjectionWithCache reader cache projection key
storeProjectionSnapshot cache key streamProjectionPosition streamProjectionState