{-# LANGUAGE DeriveDataTypeable     #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE ScopedTypeVariables    #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Subscription.Api
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
-- Main Subscription bookkeeping structure.
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Subscription.Api where

--------------------------------------------------------------------------------
import           Streaming
import qualified Streaming.Prelude as Streaming

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Types
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Types

--------------------------------------------------------------------------------
-- | Common operations supported by a subscription.
class Subscription s where
  -- | Asks for the next subcription event. If that function is called after
  --   a SubDropped event, expect it to hang indefinitely.
  nextSubEvent :: s -> IO SubAction

  -- | Asynchronously unsubscribe from a subscription.
  unsubscribe :: s -> IO ()

--------------------------------------------------------------------------------
-- | Returns the stream of a subscription.
class SubscriptionStream s t | t -> s where
    subscriptionStream :: s -> StreamId t

--------------------------------------------------------------------------------
-- | Streams a subscription events. The stream will end when hitting `Dropped`
--   event but will still emit it.
streamSubEvents :: Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents :: s -> Stream (Of SubAction) IO ()
streamSubEvents s
s
  = do Stream (Of SubAction) IO Any
rest <- (SubAction -> Bool)
-> Stream (Of SubAction) IO Any
-> Stream (Of SubAction) IO (Stream (Of SubAction) IO Any)
forall (m :: * -> *) a r.
Monad m =>
(a -> Bool)
-> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
Streaming.span SubAction -> Bool
predicate (Stream (Of SubAction) IO Any
 -> Stream (Of SubAction) IO (Stream (Of SubAction) IO Any))
-> Stream (Of SubAction) IO Any
-> Stream (Of SubAction) IO (Stream (Of SubAction) IO Any)
forall a b. (a -> b) -> a -> b
$ IO SubAction -> Stream (Of SubAction) IO Any
forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
Streaming.repeatM (s -> IO SubAction
forall s. Subscription s => s -> IO SubAction
nextSubEvent s
s)
       Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome <- IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
-> Stream
     (Of SubAction) IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
 -> Stream
      (Of SubAction)
      IO
      (Maybe (SubAction, Stream (Of SubAction) IO Any)))
-> IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
-> Stream
     (Of SubAction) IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
forall a b. (a -> b) -> a -> b
$ Stream (Of SubAction) IO Any
-> IO (Maybe (SubAction, Stream (Of SubAction) IO Any))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Maybe (a, Stream (Of a) m r))
Streaming.uncons Stream (Of SubAction) IO Any
rest
       Maybe (SubAction, Stream (Of SubAction) IO Any)
-> ((SubAction, Stream (Of SubAction) IO Any)
    -> Stream (Of SubAction) IO ())
-> Stream (Of SubAction) IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome (((SubAction, Stream (Of SubAction) IO Any)
  -> Stream (Of SubAction) IO ())
 -> Stream (Of SubAction) IO ())
-> ((SubAction, Stream (Of SubAction) IO Any)
    -> Stream (Of SubAction) IO ())
-> Stream (Of SubAction) IO ()
forall a b. (a -> b) -> a -> b
$ \(SubAction
dropped, Stream (Of SubAction) IO Any
_) -> SubAction -> Stream (Of SubAction) IO ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Streaming.yield SubAction
dropped
  where
    predicate :: SubAction -> Bool
predicate (Dropped SubDropReason
_) = Bool
False
    predicate SubAction
_ = Bool
True

--------------------------------------------------------------------------------
-- | Like `streamSubEvent` but will only emit `ResolvedEvent`.
streamSubResolvedEvents :: Subscription s => s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents :: s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents = (SubAction -> Maybe ResolvedEvent)
-> Stream (Of SubAction) IO () -> Stream (Of ResolvedEvent) IO ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> Maybe b) -> Stream (Of a) m r -> Stream (Of b) m r
Streaming.mapMaybe SubAction -> Maybe ResolvedEvent
go (Stream (Of SubAction) IO () -> Stream (Of ResolvedEvent) IO ())
-> (s -> Stream (Of SubAction) IO ())
-> s
-> Stream (Of ResolvedEvent) IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> Stream (Of SubAction) IO ()
forall s. Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents
  where
    go :: SubAction -> Maybe ResolvedEvent
go (Submit ResolvedEvent
e) = ResolvedEvent -> Maybe ResolvedEvent
forall a. a -> Maybe a
Just ResolvedEvent
e
    go SubAction
_ = Maybe ResolvedEvent
forall a. Maybe a
Nothing