{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Subscription.Api where
import Streaming
import qualified Streaming.Prelude as Streaming
import Database.EventStore.Internal.Types
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Types
class Subscription s where
nextSubEvent :: s -> IO SubAction
unsubscribe :: s -> IO ()
class SubscriptionStream s t | t -> s where
subscriptionStream :: s -> StreamId t
streamSubEvents :: Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents :: s -> Stream (Of SubAction) IO ()
streamSubEvents s
s
= do Stream (Of SubAction) IO Any
rest <- (SubAction -> Bool)
-> Stream (Of SubAction) IO Any
-> Stream (Of SubAction) IO (Stream (Of SubAction) IO Any)
forall (m :: * -> *) a r.
Monad m =>
(a -> Bool)
-> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
Streaming.span SubAction -> Bool
predicate (Stream (Of SubAction) IO Any
-> Stream (Of SubAction) IO (Stream (Of SubAction) IO Any))
-> Stream (Of SubAction) IO Any
-> Stream (Of SubAction) IO (Stream (Of SubAction) IO Any)
forall a b. (a -> b) -> a -> b
$ IO SubAction -> Stream (Of SubAction) IO Any
forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
Streaming.repeatM (s -> IO SubAction
forall s. Subscription s => s -> IO SubAction
nextSubEvent s
s)
Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome <- IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
-> Stream
(Of SubAction) IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
-> Stream
(Of SubAction)
IO
(Maybe (SubAction, Stream (Of SubAction) IO Any)))
-> IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
-> Stream
(Of SubAction) IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
forall a b. (a -> b) -> a -> b
$ Stream (Of SubAction) IO Any
-> IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Maybe (a, Stream (Of a) m r))
Streaming.uncons Stream (Of SubAction) IO Any
rest
Maybe (SubAction, Stream (Of SubAction) IO Any)
-> ((SubAction, Stream (Of SubAction) IO Any)
-> Stream (Of SubAction) IO ())
-> Stream (Of SubAction) IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome (((SubAction, Stream (Of SubAction) IO Any)
-> Stream (Of SubAction) IO ())
-> Stream (Of SubAction) IO ())
-> ((SubAction, Stream (Of SubAction) IO Any)
-> Stream (Of SubAction) IO ())
-> Stream (Of SubAction) IO ()
forall a b. (a -> b) -> a -> b
$ \(SubAction
dropped, Stream (Of SubAction) IO Any
_) -> SubAction -> Stream (Of SubAction) IO ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Streaming.yield SubAction
dropped
where
predicate :: SubAction -> Bool
predicate (Dropped SubDropReason
_) = Bool
False
predicate SubAction
_ = Bool
True
streamSubResolvedEvents :: Subscription s => s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents :: s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents = (SubAction -> Maybe ResolvedEvent)
-> Stream (Of SubAction) IO () -> Stream (Of ResolvedEvent) IO ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> Maybe b) -> Stream (Of a) m r -> Stream (Of b) m r
Streaming.mapMaybe SubAction -> Maybe ResolvedEvent
go (Stream (Of SubAction) IO () -> Stream (Of ResolvedEvent) IO ())
-> (s -> Stream (Of SubAction) IO ())
-> s
-> Stream (Of ResolvedEvent) IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> Stream (Of SubAction) IO ()
forall s. Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents
where
go :: SubAction -> Maybe ResolvedEvent
go (Submit ResolvedEvent
e) = ResolvedEvent -> Maybe ResolvedEvent
forall a. a -> Maybe a
Just ResolvedEvent
e
go SubAction
_ = Maybe ResolvedEvent
forall a. Maybe a
Nothing