module Data.CQRS.Types.ArchiveStore
( ArchiveRef
, ArchiveMetadata(..)
, ArchiveStore(..)
, StoreError(..)
, applyIso
, enumerateAllEvents
, rotateArchives
) where
import Control.Monad ((>=>), when, void)
import Data.CQRS.Types.ArchiveRef
import Data.CQRS.Types.ArchiveMetadata
import Data.CQRS.Types.PersistedEvent
import Data.CQRS.Types.StoreError
import Data.UUID.Types (UUID)
import System.IO.Streams (InputStream)
import qualified System.IO.Streams.Combinators as SC
data ArchiveStore e = ArchiveStore {
asGetUnarchivedEventCount :: IO Int
,
asArchiveEvents :: Int -> IO (Maybe UUID)
,
asReadLatestArchiveMetadata :: IO (Maybe ArchiveMetadata)
,
asReadArchiveMetadata :: UUID -> IO (Maybe ArchiveMetadata)
,
asReadArchive :: forall a . ArchiveRef -> (InputStream (UUID, PersistedEvent e) -> IO a) -> IO a
}
applyIso :: forall e' e . (e' -> e, e -> e') -> ArchiveStore e -> ArchiveStore e'
applyIso (_, g) (ArchiveStore getUnarchivedEventCount archiveEvents readLatestArchiveMetadata' readArchiveMetadata' readArchive') =
ArchiveStore getUnarchivedEventCount archiveEvents readLatestArchiveMetadata' readArchiveMetadata' readArchive
where
readArchive :: ArchiveRef -> (InputStream (UUID, PersistedEvent e') -> IO a) -> IO a
readArchive archiveRef p = readArchive' archiveRef $ SC.map (\(aggregateId, e) -> (aggregateId, fmap g e)) >=> p
enumerateAllEvents :: ArchiveStore e -> (InputStream (UUID, PersistedEvent e) -> IO ()) -> IO ()
enumerateAllEvents (ArchiveStore _ _ readLatestArchiveMetadata readArchiveMetadata readArchive) p =
findFirstArchive >>= rollForward
where
readArchiveMetadata' archiveId =
readArchiveMetadata archiveId >>= \x -> case x of
Just archiveMetadata ->
return archiveMetadata
Nothing ->
error $ "Archive inconsistency; archive reference chain contains non-existent archive " ++ show archiveId
rollForward archiveRef = do
readArchive archiveRef p
case archiveRef of
CurrentArchive -> return ()
NamedArchive archiveId -> readArchiveMetadata' archiveId >>= rollForward . amNextArchiveId
findFirstArchive =
readLatestArchiveMetadata >>= \x -> case x of
Nothing -> return CurrentArchive
Just archiveMetadata -> search archiveMetadata
where
search archiveMetadata =
case amPreviousArchiveId archiveMetadata of
Nothing -> return $ NamedArchive $ amArchiveId archiveMetadata
Just previousArchiveId -> readArchiveMetadata' previousArchiveId >>= search
rotateArchives :: ArchiveStore e -> Int -> IO ()
rotateArchives eventStore archiveSize =
if archiveSize > 0 then
loop
else
return ()
where
loop = do
count <- asGetUnarchivedEventCount eventStore
when (count >= archiveSize) $ do
void $ asArchiveEvents eventStore $ archiveSize
loop