{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE Rank2Types #-} -------------------------------------------------------------------------------- -- | -- Module : EventSource.Store -- Copyright : (C) 2016 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module EventSource.Store ( Batch'(..) , Batch , Subscription(..) , SubscriptionId , ExpectedVersionException(..) , Store(..) , SomeStore(..) , freshSubscriptionId , startFrom , appendEvent , unhandled ) where -------------------------------------------------------------------------------- import Control.Exception (Exception, throwIO) -------------------------------------------------------------------------------- import Control.Concurrent.Async.Lifted (Async) import Control.Monad.Base (MonadBase, liftBase) import Control.Monad.Except (ExceptT, runExceptT) import Data.UUID (UUID) import Data.UUID.V4 (nextRandom) import Streaming (hoist) import Streaming.Prelude (Stream, Of) -------------------------------------------------------------------------------- import EventSource.Types import EventSource.Store.Internal.Iterator (Batch'(..), startFrom) -------------------------------------------------------------------------------- type Batch = Batch' EventNumber -------------------------------------------------------------------------------- -- | Represents a subscription id. newtype SubscriptionId = SubscriptionId UUID deriving (Eq, Ord, Show) -------------------------------------------------------------------------------- -- | Returns a fresh subscription id. freshSubscriptionId :: MonadBase IO m => m SubscriptionId freshSubscriptionId = liftBase $ fmap SubscriptionId nextRandom -------------------------------------------------------------------------------- -- | A subscription allows to be notified on every change occuring on a stream. data Subscription = Subscription { subscriptionId :: SubscriptionId , subscriptionStream :: forall m. MonadBase IO m => Stream (Of SavedEvent) m () } -------------------------------------------------------------------------------- data ExpectedVersionException = ExpectedVersionException { versionExceptionExpected :: ExpectedVersion , versionExceptionActual :: ExpectedVersion } deriving Show -------------------------------------------------------------------------------- instance Exception ExpectedVersionException -------------------------------------------------------------------------------- -- | Main event store abstraction. It exposes essential features expected from -- an event store. class Store store where -- | Appends a batch of events at the end of a stream. appendEvents :: (EncodeEvent a, MonadBase IO m) => store -> StreamName -> ExpectedVersion -> [a] -> m (Async EventNumber) -- | Reads a stream in a stream-processing fashion. readStream :: MonadBase IO m => store -> StreamName -> Batch -> Stream (Of SavedEvent) (ExceptT ReadFailure m) () -- | Subscribes to given stream. subscribe :: MonadBase IO m => store -> StreamName -> m Subscription -- | Encapsulates to an abstract store. toStore :: store -> SomeStore toStore = SomeStore -------------------------------------------------------------------------------- -- | Utility type to pass any store that implements 'Store' typeclass. data SomeStore = forall store. Store store => SomeStore store -------------------------------------------------------------------------------- instance Store SomeStore where appendEvents (SomeStore store) = appendEvents store readStream (SomeStore store) = readStream store subscribe (SomeStore store) = subscribe store -------------------------------------------------------------------------------- -- | Appends a single event at the end of a stream. appendEvent :: (EncodeEvent a, MonadBase IO m, Store store) => store -> StreamName -> ExpectedVersion -> a -> m (Async EventNumber) appendEvent store stream ver a = appendEvents store stream ver [a] -------------------------------------------------------------------------------- -- | Throws an exception in case 'ExceptT' was a 'Left'. unhandled :: (MonadBase IO m, Exception e) => Stream (Of a) (ExceptT e m) () -> Stream (Of a) m () unhandled = hoist go where go m = runExceptT m >>= \case Left e -> liftBase $ throwIO e Right a -> pure a