{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE Rank2Types #-} -------------------------------------------------------------------------------- -- | -- Module : EventSource.Store -- Copyright : (C) 2016 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module EventSource.Store ( Batch(..) , Subscription(..) , SubscriptionId , ExpectedVersionException(..) , Store(..) , SomeStore(..) , StreamIterator , iteratorNext , iteratorNextEvent , iteratorReadAll , iteratorReadAllEvents , streamIterator , freshSubscriptionId , startFrom , nextEventAs , foldSub , foldSubAsync , appendEvent , forEvents , forEventsWithNumber , foldEventsWithNumberM , foldEventsM , foldEvents , forSavedEvents , foldSavedEventsM , foldSavedEvents , foldSubSaved , foldSubSavedAsync , ForEventFailure(..) , unhandled ) where -------------------------------------------------------------------------------- import Control.Monad (MonadPlus, mzero) import Control.Exception (Exception, SomeException, toException, throwIO) import Data.Bifunctor (first) import Data.Foldable (for_) import Data.Int (Int32) import Data.Traversable (for) -------------------------------------------------------------------------------- import Control.Concurrent.Async.Lifted (Async, async, wait) import Control.Monad.Base (MonadBase, liftBase) import Control.Monad.Except (ExceptT, runExceptT, mapExceptT, throwError) import Control.Monad.State (get, put, evalStateT) import Control.Monad.Trans (lift) import Control.Monad.Trans.Control (MonadBaseControl, StM) import Data.IORef.Lifted (IORef, newIORef, atomicModifyIORef') import Data.Text (Text) import Data.UUID (UUID) import Data.UUID.V4 (nextRandom) -------------------------------------------------------------------------------- import EventSource.Types -------------------------------------------------------------------------------- -- | Represents batch information needed to read a stream. data Batch = Batch { batchFrom :: EventNumber , batchSize :: Int32 } -------------------------------------------------------------------------------- -- | Starts a 'Batch' from a given point. The batch size is set to default, -- which is 500. startFrom :: EventNumber -> Batch startFrom from = Batch from 500 -------------------------------------------------------------------------------- -- | Represents a subscription id. newtype SubscriptionId = SubscriptionId UUID deriving (Eq, Ord, Show) -------------------------------------------------------------------------------- -- | Returns a fresh subscription id. freshSubscriptionId :: MonadBase IO m => m SubscriptionId freshSubscriptionId = liftBase $ fmap SubscriptionId nextRandom -------------------------------------------------------------------------------- -- | A subscription allows to be notified on every change occuring on a stream. data Subscription = Subscription { subscriptionId :: SubscriptionId , nextEvent :: forall m. MonadBase IO m => m (Either SomeException SavedEvent) } -------------------------------------------------------------------------------- -- | Waits for the next event and deserializes it on the go. nextEventAs :: (DecodeEvent a, MonadBase IO m) => Subscription -> m (Either SomeException a) nextEventAs sub = do res <- nextEvent sub let action = do event <- res first (toException . DecodeEventException) $ decodeEvent $ savedEvent event return action -------------------------------------------------------------------------------- -- | Folds over every event coming from the subscription until the end of the -- universe, unless an 'Exception' raises from the subscription. -- 'SomeException' is used because we let the underlying subscription model -- exposed its own 'Exception'. If the callback that handles incoming events -- throws an exception, it will not be catch by the error callback. foldSub :: (DecodeEvent a, MonadBase IO m) => Subscription -> (a -> m ()) -> (SomeException -> m ()) -> m () foldSub sub onEvent onError = loop where loop = do res <- nextEventAs sub case res of Left e -> onError e Right a -> onEvent a >> loop -------------------------------------------------------------------------------- -- | Asynchronous version of 'foldSub'. foldSubAsync :: (MonadBaseControl IO m, DecodeEvent a) => Subscription -> (a -> m ()) -> (SomeException -> m ()) -> m (Async (StM m ())) foldSubAsync sub onEvent onError = async $ foldSub sub onEvent onError -------------------------------------------------------------------------------- -- | Similar to 'foldSub' but provides access to the 'SavedEvent' instead of -- decoded event. foldSubSaved :: MonadBase IO m => Subscription -> (SavedEvent -> m ()) -> (SomeException -> m ()) -> m () foldSubSaved sub onEvent onError = loop where loop = do res <- nextEvent sub case res of Left e -> onError e Right a -> onEvent a >> loop -------------------------------------------------------------------------------- -- | Asynchronous version of 'foldSubSaved'. foldSubSavedAsync :: MonadBaseControl IO m => Subscription -> (SavedEvent -> m ()) -> (SomeException -> m ()) -> m (Async (StM m ())) foldSubSavedAsync sub onEvent onError = async $ foldSubSaved sub onEvent onError -------------------------------------------------------------------------------- data ExpectedVersionException = ExpectedVersionException { versionExceptionExpected :: ExpectedVersion , versionExceptionActual :: ExpectedVersion } deriving Show -------------------------------------------------------------------------------- instance Exception ExpectedVersionException -------------------------------------------------------------------------------- -- | Main event store abstraction. It exposes essential features expected from -- an event store. class Store store where -- | Appends a batch of events at the end of a stream. appendEvents :: (EncodeEvent a, MonadBase IO m) => store -> StreamName -> ExpectedVersion -> [a] -> m (Async EventNumber) -- | Appends a batch of events at the end of a stream. readBatch :: MonadBase IO m => store -> StreamName -> Batch -> m (Async (ReadStatus Slice)) -- | Subscribes to given stream. subscribe :: MonadBase IO m => store -> StreamName -> m Subscription -- | Encapsulates to an abstract store. toStore :: store -> SomeStore toStore = SomeStore -------------------------------------------------------------------------------- -- | Utility type to pass any store that implements 'Store' typeclass. data SomeStore = forall store. Store store => SomeStore store -------------------------------------------------------------------------------- instance Store SomeStore where appendEvents (SomeStore store) = appendEvents store readBatch (SomeStore store) = readBatch store subscribe (SomeStore store) = subscribe store -------------------------------------------------------------------------------- -- | Appends a single event at the end of a stream. appendEvent :: (EncodeEvent a, MonadBase IO m, Store store) => store -> StreamName -> ExpectedVersion -> a -> m (Async EventNumber) appendEvent store stream ver a = appendEvents store stream ver [a] -------------------------------------------------------------------------------- -- | Represents failures that can occurs when using 'forEvents'. data ForEventFailure = ForEventReadFailure ReadFailure | ForEventDecodeFailure Text deriving Show -------------------------------------------------------------------------------- instance Exception ForEventFailure -------------------------------------------------------------------------------- -- | Iterates over all events of stream given a starting point and a batch size. forEvents :: (MonadBase IO m, DecodeEvent a, Store store) => store -> StreamName -> (a -> m ()) -> ExceptT ForEventFailure m () forEvents store name k = forEventsWithNumber store name (const k) -------------------------------------------------------------------------------- -- | Iterates over all events of stream given a starting point and a batch size. -- It also passes the 'EventNumber' at each recursion step. forEventsWithNumber :: (MonadBase IO m, DecodeEvent a, Store store) => store -> StreamName -> (EventNumber -> a -> m ()) -> ExceptT ForEventFailure m () forEventsWithNumber store name k = do res <- streamIterator store name case res of ReadSuccess i -> loop i ReadFailure e -> throwError $ ForEventReadFailure e where loop i = do opt <- iteratorNext i for_ opt $ \saved -> case decodeEvent $ savedEvent saved of Left e -> throwError $ ForEventDecodeFailure e Right a -> lift (k (eventNumber saved) a) >> loop i -------------------------------------------------------------------------------- -- | Like 'forEvents' but expose signature similar to 'foldM'. foldEventsM :: (MonadBase IO m, DecodeEvent a, Store store) => store -> StreamName -> (s -> a -> m s) -> s -> ExceptT ForEventFailure m s foldEventsM store stream k seed = foldEventsWithNumberM store stream (const k) seed -------------------------------------------------------------------------------- -- | Like 'forEvents' but expose signature similar to 'foldM' and also passes -- the 'EventNumber' at each recursion step. foldEventsWithNumberM :: (MonadBase IO m, DecodeEvent a, Store store) => store -> StreamName -> (EventNumber -> s -> a -> m s) -> s -> ExceptT ForEventFailure m s foldEventsWithNumberM store stream k seed = mapExceptT trans action where trans m = evalStateT m seed action = do forEventsWithNumber store stream $ \num a -> do s <- get s' <- lift $ k num s a put s' get -------------------------------------------------------------------------------- -- | Like 'foldEventsM' but expose signature similar to 'foldl'. foldEvents :: (MonadBase IO m, DecodeEvent a, Store store) => store -> StreamName -> (s -> a -> s) -> s -> ExceptT ForEventFailure m s foldEvents store stream k seed = foldEventsM store stream (\s a -> return $ k s a) seed -------------------------------------------------------------------------------- -- | Like `forEvents` but provides access to 'SavedEvent' instead of -- decoded event. forSavedEvents :: (MonadBase IO m, Store store) => store -> StreamName -> (SavedEvent -> m ()) -> ExceptT ForEventFailure m () forSavedEvents store name k = do res <- streamIterator store name case res of ReadSuccess i -> loop i ReadFailure e -> throwError $ ForEventReadFailure e where loop i = do opt <- iteratorNext i for_ opt $ \saved -> lift (k saved) >> loop i -------------------------------------------------------------------------------- -- | Like 'forSavedEvents' but expose signature similar to 'foldM'. foldSavedEventsM :: (MonadBase IO m, Store store) => store -> StreamName -> (s -> SavedEvent -> m s) -> s -> ExceptT ForEventFailure m s foldSavedEventsM store stream k seed = mapExceptT trans action where trans m = evalStateT m seed action = do forSavedEvents store stream $ \a -> do s <- get s' <- lift $ k s a put s' get -------------------------------------------------------------------------------- -- | Like 'foldSavedEventsM' but expose signature similar to 'foldl'. foldSavedEvents :: (MonadBase IO m, Store store) => store -> StreamName -> (s -> SavedEvent -> s) -> s -> ExceptT ForEventFailure m s foldSavedEvents store stream k seed = foldSavedEventsM store stream (\s a -> return $ k s a) seed -------------------------------------------------------------------------------- -- | Throws an exception in case 'ExceptT' was a 'Left'. unhandled :: (MonadBase IO m, Exception e) => ExceptT e m a -> m a unhandled m = runExceptT m >>= \case Left e -> liftBase $ throwIO e Right a -> pure a -------------------------------------------------------------------------------- -- | Allows to easily iterate over a stream's events. newtype StreamIterator = StreamIterator { iteratorNext :: forall m. MonadBase IO m => m (Maybe SavedEvent) } -------------------------------------------------------------------------------- instance Show StreamIterator where show _ = "StreamIterator" -------------------------------------------------------------------------------- -- | Reads the next available event from the 'StreamIterator' and try to -- deserialize it at the same time. iteratorNextEvent :: (DecodeEvent a, MonadBase IO m, MonadPlus m) => StreamIterator -> m (Maybe a) iteratorNextEvent i = do res <- iteratorNext i case res of Nothing -> return Nothing Just s -> case decodeEvent $ savedEvent s of Left _ -> mzero Right a -> return $ Just a -------------------------------------------------------------------------------- -- | Reads all events from the 'StreamIterator' until reaching end of stream. iteratorReadAll :: MonadBase IO m => StreamIterator -> m [SavedEvent] iteratorReadAll i = do res <- iteratorNext i case res of Nothing -> return [] Just s -> fmap (s:) $ iteratorReadAll i -------------------------------------------------------------------------------- -- | Like 'iteratorReadAll' but try to deserialize the events at the same time. iteratorReadAllEvents :: (DecodeEvent a, MonadBase IO m, MonadPlus m) => StreamIterator -> m [a] iteratorReadAllEvents i = do res <- iteratorNextEvent i case res of Nothing -> return [] Just a -> fmap (a:) $ iteratorReadAllEvents i -------------------------------------------------------------------------------- -- | Returns a 'StreamIterator' for the given stream name. The returned -- 'StreamIterator' IS NOT THREADSAFE. streamIterator :: (Store store, MonadBase IO m) => store -> StreamName -> m (ReadStatus StreamIterator) streamIterator store name = do w <- readBatch store name (startFrom 0) res <- liftBase $ wait w for res $ \slice -> do ref <- newIORef $ IteratorOverAvailable slice return $ StreamIterator $ iterateOver store ref name -------------------------------------------------------------------------------- data IteratorOverState = IteratorOverAvailable Slice | IteratorOverClosed -------------------------------------------------------------------------------- data IteratorOverAction = IteratorOverEvent SavedEvent | IteratorOverNextBatch EventNumber | IteratorOverEndOfStream -------------------------------------------------------------------------------- iterateOver :: (Store store, MonadBase IO m) => store -> IORef IteratorOverState -> StreamName -> m (Maybe SavedEvent) iterateOver store ref name = go where go = do action <- atomicModifyIORef' ref $ \st -> case st of IteratorOverAvailable slice -> case sliceEvents slice of e:es -> let nextSlice = slice { sliceEvents = es } nxtSt = IteratorOverAvailable nextSlice in (nxtSt, IteratorOverEvent e) [] | sliceEndOfStream slice -> (IteratorOverClosed, IteratorOverEndOfStream) | otherwise -> let resp = IteratorOverNextBatch $ sliceNextEventNumber slice in (st, resp) IteratorOverClosed -> (st, IteratorOverEndOfStream) case action of IteratorOverEvent e -> return $ Just e IteratorOverEndOfStream -> return Nothing IteratorOverNextBatch num -> do w <- readBatch store name (startFrom num) res <- liftBase $ wait w case res of ReadFailure _ -> do atomicModifyIORef' ref $ \_ -> (IteratorOverClosed, ()) return Nothing ReadSuccess slice -> do let nxtSt = IteratorOverAvailable slice atomicModifyIORef' ref $ \_ -> (nxtSt, ()) go