module EventSource.Store
( Batch(..)
, Subscription(..)
, SubscriptionId
, ExpectedVersionException(..)
, Store(..)
, SomeStore(..)
, StreamIterator
, iteratorNext
, iteratorNextEvent
, iteratorReadAll
, iteratorReadAllEvents
, streamIterator
, freshSubscriptionId
, startFrom
, nextEventAs
, foldSub
, foldSubAsync
, appendEvent
, forEvents
, foldEventsM
, foldEvents
, forSavedEvents
, foldSavedEventsM
, foldSavedEvents
, foldSubSaved
, foldSubSavedAsync
, ForEventFailure(..)
) where
import Prelude (Show(..))
import Control.Monad.Except
import Data.IORef
import Data.UUID
import Data.UUID.V4
import Protolude hiding (from, show, trans)
import EventSource.Types
data Batch =
Batch { batchFrom :: EventNumber
, batchSize :: Int32
}
startFrom :: EventNumber -> Batch
startFrom from = Batch from 500
newtype SubscriptionId = SubscriptionId UUID deriving (Eq, Ord, Show)
freshSubscriptionId :: MonadIO m => m SubscriptionId
freshSubscriptionId = liftIO $ fmap SubscriptionId nextRandom
data Subscription =
Subscription { subscriptionId :: SubscriptionId
, nextEvent :: forall m. MonadIO m
=> m (Either SomeException SavedEvent)
}
nextEventAs :: (DecodeEvent a, MonadIO m)
=> Subscription
-> m (Either SomeException a)
nextEventAs sub = do
res <- nextEvent sub
let action = do
event <- res
first (toException . DecodeEventException)
$ decodeEvent
$ savedEvent event
return action
foldSub :: (DecodeEvent a, MonadIO m)
=> Subscription
-> (a -> m ())
-> (SomeException -> m ())
-> m ()
foldSub sub onEvent onError = loop
where
loop = do
res <- nextEventAs sub
case res of
Left e -> onError e
Right a -> onEvent a >> loop
foldSubAsync :: DecodeEvent a
=> Subscription
-> (a -> IO ())
-> (SomeException -> IO ())
-> IO (Async ())
foldSubAsync sub onEvent onError =
async $ foldSub sub onEvent onError
foldSubSaved :: (MonadIO m)
=> Subscription
-> (SavedEvent -> m ())
-> (SomeException -> m ())
-> m ()
foldSubSaved sub onEvent onError = loop
where
loop = do
res <- nextEvent sub
case res of
Left e -> onError e
Right a -> onEvent a >> loop
foldSubSavedAsync :: Subscription
-> (SavedEvent -> IO ())
-> (SomeException -> IO ())
-> IO (Async ())
foldSubSavedAsync sub onEvent onError =
async $ foldSubSaved sub onEvent onError
data ExpectedVersionException
= ExpectedVersionException
{ versionExceptionExpected :: ExpectedVersion
, versionExceptionActual :: ExpectedVersion
} deriving Show
instance Exception ExpectedVersionException
class Store store where
appendEvents :: (EncodeEvent a, MonadIO m)
=> store
-> StreamName
-> ExpectedVersion
-> [a]
-> m (Async EventNumber)
readBatch :: MonadIO m
=> store
-> StreamName
-> Batch
-> m (Async (ReadStatus Slice))
subscribe :: MonadIO m => store -> StreamName -> m Subscription
data SomeStore = forall store. Store store => SomeStore store
instance Store SomeStore where
appendEvents (SomeStore store) = appendEvents store
readBatch (SomeStore store) = readBatch store
subscribe (SomeStore store) = subscribe store
appendEvent :: (EncodeEvent a, MonadIO m, Store store)
=> store
-> StreamName
-> ExpectedVersion
-> a
-> m (Async EventNumber)
appendEvent store stream ver a = appendEvents store stream ver [a]
data ForEventFailure
= ForEventReadFailure ReadFailure
| ForEventDecodeFailure Text
deriving Show
instance Exception ForEventFailure
forEvents :: (MonadIO m, DecodeEvent a, Store store)
=> store
-> StreamName
-> (a -> m ())
-> ExceptT ForEventFailure m ()
forEvents store name k = do
res <- streamIterator store name
case res of
ReadSuccess i -> loop i
ReadFailure e -> throwError $ ForEventReadFailure e
where
loop i = do
opt <- iteratorNext i
for_ opt $ \saved ->
case decodeEvent $ savedEvent saved of
Left e -> throwError $ ForEventDecodeFailure e
Right a -> lift (k a) >> loop i
foldEventsM :: (MonadIO m, DecodeEvent a, Store store)
=> store
-> StreamName
-> (s -> a -> m s)
-> s
-> ExceptT ForEventFailure m s
foldEventsM store stream k seed = mapExceptT trans action
where
trans m = evalStateT m seed
action = do
forEvents store stream $ \a -> do
s <- get
s' <- lift $ k s a
put s'
get
foldEvents :: (MonadIO m, DecodeEvent a, Store store)
=> store
-> StreamName
-> (s -> a -> s)
-> s
-> ExceptT ForEventFailure m s
foldEvents store stream k seed =
foldEventsM store stream (\s a -> return $ k s a) seed
forSavedEvents :: (MonadIO m, Store store)
=> store
-> StreamName
-> (SavedEvent -> m ())
-> ExceptT ForEventFailure m ()
forSavedEvents store name k = do
res <- streamIterator store name
case res of
ReadSuccess i -> loop i
ReadFailure e -> throwError $ ForEventReadFailure e
where
loop i = do
opt <- iteratorNext i
for_ opt $ \saved -> lift (k saved) >> loop i
foldSavedEventsM :: (MonadIO m, Store store)
=> store
-> StreamName
-> (s -> SavedEvent -> m s)
-> s
-> ExceptT ForEventFailure m s
foldSavedEventsM store stream k seed = mapExceptT trans action
where
trans m = evalStateT m seed
action = do
forSavedEvents store stream $ \a -> do
s <- get
s' <- lift $ k s a
put s'
get
foldSavedEvents :: (MonadIO m, Store store)
=> store
-> StreamName
-> (s -> SavedEvent -> s)
-> s
-> ExceptT ForEventFailure m s
foldSavedEvents store stream k seed =
foldSavedEventsM store stream (\s a -> return $ k s a) seed
newtype StreamIterator =
StreamIterator { iteratorNext :: forall m. MonadIO m => m (Maybe SavedEvent) }
instance Show StreamIterator where
show _ = "StreamIterator"
iteratorNextEvent :: (DecodeEvent a, MonadIO m, MonadPlus m)
=> StreamIterator
-> m (Maybe a)
iteratorNextEvent i = do
res <- iteratorNext i
case res of
Nothing -> return Nothing
Just s ->
case decodeEvent $ savedEvent s of
Left _ -> mzero
Right a -> return $ Just a
iteratorReadAll :: MonadIO m => StreamIterator -> m [SavedEvent]
iteratorReadAll i = do
res <- iteratorNext i
case res of
Nothing -> return []
Just s -> fmap (s:) $ iteratorReadAll i
iteratorReadAllEvents :: (DecodeEvent a, MonadIO m, MonadPlus m)
=> StreamIterator
-> m [a]
iteratorReadAllEvents i = do
res <- iteratorNextEvent i
case res of
Nothing -> return []
Just a -> fmap (a:) $ iteratorReadAllEvents i
streamIterator :: (Store store, MonadIO m)
=> store
-> StreamName
-> m (ReadStatus StreamIterator)
streamIterator store name = do
w <- readBatch store name (startFrom 0)
res <- liftIO $ wait w
for res $ \slice -> do
ref <- liftIO $ newIORef $ IteratorOverAvailable slice
return $ StreamIterator $ iterateOver store ref name
data IteratorOverState
= IteratorOverAvailable Slice
| IteratorOverClosed
data IteratorOverAction
= IteratorOverEvent SavedEvent
| IteratorOverNextBatch EventNumber
| IteratorOverEndOfStream
iterateOver :: (Store store, MonadIO m)
=> store
-> IORef IteratorOverState
-> StreamName
-> m (Maybe SavedEvent)
iterateOver store ref name = go
where
go = do
action <- liftIO $ atomicModifyIORef' ref $ \st ->
case st of
IteratorOverAvailable slice ->
case sliceEvents slice of
e:es ->
let nextSlice = slice { sliceEvents = es }
nxtSt = IteratorOverAvailable nextSlice in
(nxtSt, IteratorOverEvent e)
[] | sliceEndOfStream slice
-> (IteratorOverClosed, IteratorOverEndOfStream)
| otherwise
-> let resp = IteratorOverNextBatch $
sliceNextEventNumber slice in
(st, resp)
IteratorOverClosed -> (st, IteratorOverEndOfStream)
case action of
IteratorOverEvent e -> return $ Just e
IteratorOverEndOfStream -> return Nothing
IteratorOverNextBatch num -> do
w <- readBatch store name (startFrom num)
res <- liftIO $ wait w
case res of
ReadFailure _ -> do
liftIO $ atomicModifyIORef' ref $ \_ -> (IteratorOverClosed, ())
return Nothing
ReadSuccess slice -> do
let nxtSt = IteratorOverAvailable slice
liftIO $ atomicModifyIORef' ref $ \_ -> (nxtSt, ())
go