module Eventful.ProjectionCache.Types
( ProjectionCache (..)
, StreamProjectionCache
, GloballyOrderedProjectionCache
, runProjectionCacheUsing
, serializedProjectionCache
, getLatestProjectionWithCache
, getLatestGlobalProjectionWithCache
, updateProjectionCache
, updateGlobalProjectionCache
) where
import Eventful.Projection
import Eventful.Serializer
import Eventful.Store.Class
import Eventful.UUID
data ProjectionCache key orderKey serialized m
= ProjectionCache
{ storeProjectionSnapshot :: key -> orderKey -> serialized -> m ()
, loadProjectionSnapshot :: key -> m (Maybe (orderKey, serialized))
}
type StreamProjectionCache serialized m = ProjectionCache UUID EventVersion serialized m
type GloballyOrderedProjectionCache key serialized m = ProjectionCache key SequenceNumber serialized m
runProjectionCacheUsing
:: (Monad m, Monad mstore)
=> (forall a. mstore a -> m a)
-> ProjectionCache key orderKey serialized mstore
-> ProjectionCache key orderKey serialized m
runProjectionCacheUsing runCache ProjectionCache{..} =
ProjectionCache
{ storeProjectionSnapshot = \uuid version state -> runCache $ storeProjectionSnapshot uuid version state
, loadProjectionSnapshot = runCache . loadProjectionSnapshot
}
serializedProjectionCache
:: (Monad m)
=> Serializer state serialized
-> ProjectionCache key orderKey serialized m
-> ProjectionCache key orderKey state m
serializedProjectionCache Serializer{..} ProjectionCache{..} =
ProjectionCache storeProjectionSnapshot' loadProjectionSnapshot'
where
storeProjectionSnapshot' uuid version = storeProjectionSnapshot uuid version . serialize
loadProjectionSnapshot' uuid = do
mState <- loadProjectionSnapshot uuid
return $ mState >>= traverse deserialize
getLatestProjectionWithCache
:: (Monad m)
=> EventStore event m
-> StreamProjectionCache state m
-> StreamProjection state event
-> m (StreamProjection state event)
getLatestProjectionWithCache store cache originalProj = do
mLatestState <- loadProjectionSnapshot cache (streamProjectionUuid originalProj)
let
mkProjection' (version, state) =
if version > streamProjectionVersion originalProj
then
originalProj
{ streamProjectionVersion = version
, streamProjectionState = state
}
else originalProj
projection' = maybe originalProj mkProjection' mLatestState
getLatestProjection store projection'
getLatestGlobalProjectionWithCache
:: (Monad m)
=> GloballyOrderedEventStore event m
-> GloballyOrderedProjectionCache key state m
-> GloballyOrderedProjection state event
-> key
-> m (GloballyOrderedProjection state event)
getLatestGlobalProjectionWithCache store cache originalProj key = do
mLatestState <- loadProjectionSnapshot cache key
let
mkProjection' (seqNum, state) =
if seqNum > globallyOrderedProjectionSequenceNumber originalProj
then
originalProj
{ globallyOrderedProjectionSequenceNumber = seqNum
, globallyOrderedProjectionState = state
}
else originalProj
projection' = maybe originalProj mkProjection' mLatestState
getLatestGlobalProjection store projection'
updateProjectionCache
:: (Monad m)
=> EventStore event m
-> StreamProjectionCache state m
-> StreamProjection state event
-> m ()
updateProjectionCache store cache projection = do
StreamProjection{..} <- getLatestProjectionWithCache store cache projection
storeProjectionSnapshot cache streamProjectionUuid streamProjectionVersion streamProjectionState
updateGlobalProjectionCache
:: (Monad m)
=> GloballyOrderedEventStore event m
-> GloballyOrderedProjectionCache key state m
-> GloballyOrderedProjection state event
-> key
-> m ()
updateGlobalProjectionCache store cache projection key = do
GloballyOrderedProjection{..} <- getLatestGlobalProjectionWithCache store cache projection key
storeProjectionSnapshot cache key globallyOrderedProjectionSequenceNumber globallyOrderedProjectionState