{-# LANGUAGE GADTs              #-}
{-# LANGUAGE LambdaCase         #-}
{-# LANGUAGE StandaloneDeriving #-}
--------------------------------------------------------------------------------
-- |
-- Module    :  Database.EventStore.Streaming
-- Copyright :  (C) 2018 Yorick Laupa
-- License   :  (see the file LICENSE)
-- Maintainer:  Yorick Laupa <yo.eight@gmail.com>
-- Stability :  experimental
-- Portability: non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Streaming
    ( ReadError(..)
    , Fetch(..)
    , ReadResultHandler(..)
    , readThroughForward
    , readThroughBackward
    , throwOnError
    , defaultReadResultHandler
    , onRegularStream
    , readThrough
    ) where

--------------------------------------------------------------------------------
import Control.Exception (Exception, throwIO)
import Data.Int (Int32)
import Data.Maybe (fromMaybe)
import Data.Typeable (Typeable)
import Prelude

--------------------------------------------------------------------------------
import           Control.Concurrent.Async.Lifted (wait)
import           Control.Monad.Except (ExceptT, throwError, runExceptT)
import           Data.Text (Text)
import           Streaming
import qualified Streaming.Prelude as Streaming

--------------------------------------------------------------------------------
import qualified Database.EventStore as ES

--------------------------------------------------------------------------------
data ReadError t where
    StreamDeleted :: ES.StreamName -> ReadError ES.EventNumber
    ReadError :: Maybe Text -> ReadError t
    AccessDenied :: ES.StreamId t -> ReadError t
    NoStream :: ReadError ES.EventNumber

--------------------------------------------------------------------------------
deriving instance Show (ReadError t)

--------------------------------------------------------------------------------
instance (Show t, Typeable t) => Exception (ReadError t)

--------------------------------------------------------------------------------
data Fetch t = FetchError !(ReadError t) | Fetch !(ES.Slice t)

--------------------------------------------------------------------------------
newtype ReadResultHandler =
  ReadResultHandler
  { ReadResultHandler
-> forall t. StreamId t -> BatchResult t -> Fetch t
runReadResultHandler :: forall t. ES.StreamId t -> ES.BatchResult t -> Fetch t }

--------------------------------------------------------------------------------
defaultReadResultHandler :: ReadResultHandler
defaultReadResultHandler :: ReadResultHandler
defaultReadResultHandler = (forall t. StreamId t -> BatchResult t -> Fetch t)
-> ReadResultHandler
ReadResultHandler forall t. StreamId t -> BatchResult t -> Fetch t
go
  where
    go :: ES.StreamId t -> ES.BatchResult t -> Fetch t
    go :: StreamId t -> BatchResult t -> Fetch t
go ES.StreamName{} = BatchResult t -> Fetch t
forall t. ReadResult t (Slice t) -> Fetch t
toFetch
    go StreamId t
ES.All          = BatchResult t -> Fetch t
forall t. Slice t -> Fetch t
Fetch

    toFetch :: ReadResult t (Slice t) -> Fetch t
toFetch ReadResult t (Slice t)
ES.ReadNoStream          = Slice EventNumber -> Fetch EventNumber
forall t. Slice t -> Fetch t
Fetch Slice EventNumber
forall t. Slice t
ES.emptySlice
    toFetch ReadResult t (Slice t)
ES.ReadNotModified       = Slice t -> Fetch t
forall t. Slice t -> Fetch t
Fetch Slice t
forall t. Slice t
ES.emptySlice
    toFetch (ES.ReadStreamDeleted StreamName
n) = ReadError EventNumber -> Fetch EventNumber
forall t. ReadError t -> Fetch t
FetchError (StreamName -> ReadError EventNumber
StreamDeleted StreamName
n)
    toFetch (ES.ReadError Maybe Text
e)         = ReadError t -> Fetch t
forall t. ReadError t -> Fetch t
FetchError (Maybe Text -> ReadError t
forall t. Maybe Text -> ReadError t
ReadError Maybe Text
e)
    toFetch (ES.ReadAccessDenied StreamId t
n)  = ReadError t -> Fetch t
forall t. ReadError t -> Fetch t
FetchError (StreamId t -> ReadError t
forall t. StreamId t -> ReadError t
AccessDenied StreamId t
n)
    toFetch (ES.ReadSuccess Slice t
s)       = Slice t -> Fetch t
forall t. Slice t -> Fetch t
Fetch Slice t
s

--------------------------------------------------------------------------------
onRegularStream :: (ES.ReadResult ES.EventNumber (ES.Slice ES.EventNumber) -> Fetch ES.EventNumber)
                -> ReadResultHandler
onRegularStream :: (ReadResult EventNumber (Slice EventNumber) -> Fetch EventNumber)
-> ReadResultHandler
onRegularStream ReadResult EventNumber (Slice EventNumber) -> Fetch EventNumber
callback = (forall t. StreamId t -> BatchResult t -> Fetch t)
-> ReadResultHandler
ReadResultHandler forall t. StreamId t -> BatchResult t -> Fetch t
go
  where
    go :: ES.StreamId t -> ES.BatchResult t -> Fetch t
    go :: StreamId t -> BatchResult t -> Fetch t
go ES.StreamName{} = ReadResult EventNumber (Slice EventNumber) -> Fetch EventNumber
BatchResult t -> Fetch t
callback
    go StreamId t
ES.All          = BatchResult t -> Fetch t
forall t. Slice t -> Fetch t
Fetch

--------------------------------------------------------------------------------
data State t = Need t | Fetched ![ES.ResolvedEvent] !(Maybe t)

--------------------------------------------------------------------------------
streaming :: (t -> IO (Fetch t))
          -> t
          -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
streaming :: (t -> IO (Fetch t))
-> t -> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
streaming t -> IO (Fetch t)
iteratee = (State t
 -> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t)))
-> State t
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
forall (m :: * -> *) s r a.
Monad m =>
(s -> m (Either r (a, s))) -> s -> Stream (Of a) m r
Streaming.unfoldr State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go (State t
 -> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ())
-> (t -> State t)
-> t
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t -> State t
forall t. t -> State t
Need
  where
      go :: State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go (Fetched [ResolvedEvent]
buffer Maybe t
next) =
          case [ResolvedEvent]
buffer of
            ResolvedEvent
e:[ResolvedEvent]
rest -> Either () (ResolvedEvent, State t)
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((ResolvedEvent, State t) -> Either () (ResolvedEvent, State t)
forall a b. b -> Either a b
Right (ResolvedEvent
e, [ResolvedEvent] -> Maybe t -> State t
forall t. [ResolvedEvent] -> Maybe t -> State t
Fetched [ResolvedEvent]
rest Maybe t
next))
            []     -> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
-> (t
    -> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t)))
-> Maybe t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
forall b. ExceptT (ReadError t) IO (Either () b)
stop (State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go (State t
 -> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t)))
-> (t -> State t)
-> t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t -> State t
forall t. t -> State t
Need) Maybe t
next
      go (Need t
pos) = do
          IO (Fetch t) -> ExceptT (ReadError t) IO (Fetch t)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (t -> IO (Fetch t)
iteratee t
pos) ExceptT (ReadError t) IO (Fetch t)
-> (Fetch t
    -> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t)))
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              FetchError ReadError t
e -> ReadError t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ReadError t
e
              Fetch Slice t
s      ->
                  case Slice t
s of
                      Slice t
ES.SliceEndOfStream -> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
forall b. ExceptT (ReadError t) IO (Either () b)
stop
                      ES.Slice [ResolvedEvent]
xs Maybe t
next    -> State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go ([ResolvedEvent] -> Maybe t -> State t
forall t. [ResolvedEvent] -> Maybe t -> State t
Fetched [ResolvedEvent]
xs Maybe t
next)

      stop :: ExceptT (ReadError t) IO (Either () b)
stop = Either () b -> ExceptT (ReadError t) IO (Either () b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () b
forall a b. a -> Either a b
Left ())

--------------------------------------------------------------------------------
-- | Returns an iterator able to consume a stream entirely. When reading forward,
--   the iterator ends when the last stream's event is reached.
readThroughForward :: ES.Connection
                   -> ES.StreamId t
                   -> ES.ResolveLink
                   -> t
                   -> Maybe Int32
                   -> Maybe ES.Credentials
                   -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughForward :: Connection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughForward Connection
conn = Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
forall t.
Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough Connection
conn ReadResultHandler
defaultReadResultHandler ReadDirection
ES.Forward

--------------------------------------------------------------------------------
-- | Returns an iterator able to consume a stream entirely. When reading backward,
--   the iterator ends when the first stream's event is reached.
readThroughBackward :: ES.Connection
                    -> ES.StreamId t
                    -> ES.ResolveLink
                    -> t
                    -> Maybe Int32
                    -> Maybe ES.Credentials
                    -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughBackward :: Connection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughBackward Connection
conn = Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
forall t.
Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough Connection
conn ReadResultHandler
defaultReadResultHandler ReadDirection
ES.Backward

--------------------------------------------------------------------------------
-- | Throws an exception in case 'ExceptT' is a 'Left'.
throwOnError :: (Show t, Typeable t)
             => Stream (Of a) (ExceptT (ReadError t) IO) ()
             -> Stream (Of a) IO ()
throwOnError :: Stream (Of a) (ExceptT (ReadError t) IO) () -> Stream (Of a) IO ()
throwOnError = (forall a. ExceptT (ReadError t) IO a -> IO a)
-> Stream (Of a) (ExceptT (ReadError t) IO) ()
-> Stream (Of a) IO ()
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. ExceptT (ReadError t) IO a -> IO a
forall e b. Exception e => ExceptT e IO b -> IO b
go
  where
    go :: ExceptT e IO b -> IO b
go ExceptT e IO b
action =
        ExceptT e IO b -> IO (Either e b)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT e IO b
action IO (Either e b) -> (Either e b -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left e
e  -> e -> IO b
forall e a. Exception e => e -> IO a
throwIO e
e
            Right b
a -> b -> IO b
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
a

--------------------------------------------------------------------------------
-- | Returns an iterator able to consume a stream entirely.
readThrough :: ES.Connection
            -> ReadResultHandler
            -> ES.ReadDirection
            -> ES.StreamId t
            -> ES.ResolveLink
            -> t
            -> Maybe Int32
            -> Maybe ES.Credentials
            -> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough :: Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough Connection
conn ReadResultHandler
handler ReadDirection
dir StreamId t
streamId ResolveLink
lnk t
from Maybe Int32
sizMay Maybe Credentials
cred = (t -> IO (Fetch t))
-> t -> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
forall t.
(t -> IO (Fetch t))
-> t -> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
streaming t -> IO (Fetch t)
iteratee t
from
  where
    batchSize :: Int32
batchSize = Int32 -> Maybe Int32 -> Int32
forall a. a -> Maybe a -> a
fromMaybe Int32
500 Maybe Int32
sizMay

    iteratee :: t -> IO (Fetch t)
iteratee =
        case ReadDirection
dir of
            ReadDirection
ES.Forward  -> Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
forall t.
Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readForward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
batchSize ResolveLink
lnk Maybe Credentials
cred
            ReadDirection
ES.Backward -> Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
forall t.
Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readBackward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
batchSize ResolveLink
lnk Maybe Credentials
cred

--------------------------------------------------------------------------------
readForward :: ES.Connection
            -> ReadResultHandler
            -> ES.StreamId t
            -> Int32
            -> ES.ResolveLink
            -> Maybe ES.Credentials
            -> t
            -> IO (Fetch t)
readForward :: Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readForward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
siz ResolveLink
lnk Maybe Credentials
creds t
start =
    (BatchResult t -> Fetch t) -> IO (BatchResult t) -> IO (Fetch t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ReadResultHandler -> StreamId t -> BatchResult t -> Fetch t
ReadResultHandler
-> forall t. StreamId t -> BatchResult t -> Fetch t
runReadResultHandler ReadResultHandler
handler StreamId t
streamId) (IO (BatchResult t) -> IO (Fetch t))
-> (Async (BatchResult t) -> IO (BatchResult t))
-> Async (BatchResult t)
-> IO (Fetch t)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async (BatchResult t) -> IO (BatchResult t)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait (Async (BatchResult t) -> IO (Fetch t))
-> IO (Async (BatchResult t)) -> IO (Fetch t)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
        Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
forall t.
Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
ES.readEventsForward Connection
conn StreamId t
streamId t
start Int32
siz ResolveLink
lnk Maybe Credentials
creds

--------------------------------------------------------------------------------
readBackward :: ES.Connection
             -> ReadResultHandler
             -> ES.StreamId t
             -> Int32
             -> ES.ResolveLink
             -> Maybe ES.Credentials
             -> t
             -> IO (Fetch t)
readBackward :: Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readBackward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
siz ResolveLink
lnk Maybe Credentials
creds t
start =
    (BatchResult t -> Fetch t) -> IO (BatchResult t) -> IO (Fetch t)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ReadResultHandler -> StreamId t -> BatchResult t -> Fetch t
ReadResultHandler
-> forall t. StreamId t -> BatchResult t -> Fetch t
runReadResultHandler ReadResultHandler
handler StreamId t
streamId) (IO (BatchResult t) -> IO (Fetch t))
-> (Async (BatchResult t) -> IO (BatchResult t))
-> Async (BatchResult t)
-> IO (Fetch t)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async (BatchResult t) -> IO (BatchResult t)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait (Async (BatchResult t) -> IO (Fetch t))
-> IO (Async (BatchResult t)) -> IO (Fetch t)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
        Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
forall t.
Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
ES.readEventsBackward Connection
conn StreamId t
streamId t
start Int32
siz ResolveLink
lnk Maybe Credentials
creds