{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE ScopedTypeVariables #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation.Catchup
-- Copyright : (C) 2015 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Operation.Catchup
    ( catchup ) where

--------------------------------------------------------------------------------
import Data.Int
import Data.Maybe
import Data.ProtocolBuffers

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import qualified Database.EventStore.Internal.Operation.ReadAllEvents.Message as ReadAll
import qualified Database.EventStore.Internal.Operation.ReadStreamEvents.Message as ReadStream
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
defaultBatchSize :: Int32
defaultBatchSize :: Int32
defaultBatchSize = Int32
500

--------------------------------------------------------------------------------
data State s
  = Init s
  | Catchup UUID UUID s
  | Live UUID s

--------------------------------------------------------------------------------
createReadPkg
  :: Settings
  -> StreamId t
  -> t
  -> Int32 -- Batch size
  -> Bool -- Resolve links
  -> Maybe Credentials
  -> IO Package
createReadPkg :: Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts (StreamName Text
stream) t
evtNum Int32
batch Bool
tos Maybe Credentials
cred
  = let
      req :: Request
req =
        Text -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadStream.newRequest
          Text
stream
          (EventNumber -> Int64
eventNumberToInt64 t
EventNumber
evtNum)
          Int32
batch
          Bool
tos
          (Settings -> Bool
s_requireMaster Settings
setts) in
    Command -> Maybe Credentials -> Request -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readStreamEventsForwardCmd Maybe Credentials
cred Request
req
createReadPkg Settings
setts StreamId t
All t
pos Int32
batch Bool
tos Maybe Credentials
cred
  = let
      req :: Request
req =
        Int64 -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadAll.newRequest
          (Position -> Int64
positionCommit t
Position
pos)
          (Position -> Int64
positionPrepare t
Position
pos)
          Int32
batch
          Bool
tos
          (Settings -> Bool
s_requireMaster Settings
setts) in
    Command -> Maybe Credentials -> Request -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readAllEventsForwardCmd Maybe Credentials
cred Request
req

--------------------------------------------------------------------------------
catchup
  :: Settings
  -> Exec
  -> StreamId t
  -> t
  -> Bool        -- Resolve link tos.
  -> Maybe Int32 -- Batch size.
  -> Maybe Credentials
  -> IO (TVar (Maybe UUID), Chan SubAction)
catchup :: Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup Settings
setts Exec
exec StreamId t
streamId t
from Bool
tos Maybe Int32
batchSiz Maybe Credentials
cred
  = do Mailbox
m <- IO Mailbox
forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
       Chan SubAction
subM <- IO (Chan SubAction)
forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
       TVar (Maybe UUID)
var <- Maybe UUID -> IO (TVar (Maybe UUID))
forall a. a -> IO (TVar a)
newTVarIO Maybe UUID
forall a. Maybe a
Nothing
       Async ()
_ <- IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (IO () -> IO (Async (StM IO ())))
-> IO () -> IO (Async (StM IO ()))
forall a b. (a -> b) -> a -> b
$ State t -> (State t -> IO (LoopS (State t) ())) -> IO ()
forall (m :: * -> *) s a.
Monad m =>
s -> (s -> m (LoopS s a)) -> m a
keepLoopingS (t -> State t
forall s. s -> State s
Init t
from) ((State t -> IO (LoopS (State t) ())) -> IO ())
-> (State t -> IO (LoopS (State t) ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
         Init t
pos
           -> do let subReq :: SubscribeToStream
subReq = Text -> Bool -> SubscribeToStream
subscribeToStream Text
stream Bool
tos
                 Package
subPkg <- Command -> Maybe Credentials -> SubscribeToStream -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
subscribeToStreamCmd Maybe Credentials
cred SubscribeToStream
subReq
                 Package
readPkg <- Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId t
pos Int32
batch Bool
tos Maybe Credentials
cred

                 Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
subPkg)
                 Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
readPkg)
                 let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
subPkg

                 STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe UUID) -> Maybe UUID -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe UUID)
var (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
theSubId)

                 LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (UUID -> UUID -> t -> State t
forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId (Package -> UUID
packageCorrelation Package
readPkg) t
pos)

         unchanged :: State t
unchanged@(Catchup UUID
theSubId UUID
readId t
pos)
           -> do Either OperationError Package
outcome <- Mailbox -> IO (Either OperationError Package)
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
                 case Either OperationError Package
outcome of
                   Left OperationError
e
                     -> case OperationError
e of
                          OperationError
ConnectionHasDropped
                            -> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (t -> State t
forall s. s -> State s
Init t
pos)
                          OperationError
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)

                   Right Package
respPkg
                     | UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
                     -> let Right SubscriptionDropped
resp = Package -> Either OperationError SubscriptionDropped
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            reason :: DropReason
reason = DropReason -> Maybe DropReason -> DropReason
forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
     (Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (OptionalField (Last (Enumeration DropReason)))
 -> FieldType
      (Field 1 (OptionalField (Last (Enumeration DropReason)))))
-> Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
     (Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
                            subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
                        () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)

                     | UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
                     -> let Right SubscriptionConfirmation
resp = Package -> Either OperationError SubscriptionConfirmation
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            lcp :: FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp = Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 1 (RequiredField (Always (Value Int64)))))
-> Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
                            len :: FieldType (Field 2 (OptionalField (Last (Value Int64))))
len = Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (OptionalField (Last (Value Int64)))
 -> FieldType (Field 2 (OptionalField (Last (Value Int64)))))
-> Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
                            details :: SubDetails
details =
                              SubDetails :: UUID -> Int64 -> Maybe Int64 -> Maybe Text -> SubDetails
SubDetails
                              { subId :: UUID
subId = UUID
theSubId
                              , subCommitPos :: Int64
subCommitPos = Int64
FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp
                              , subLastEventNum :: Maybe Int64
subLastEventNum = Maybe Int64
FieldType (Field 2 (OptionalField (Last (Value Int64))))
len
                              , subSubId :: Maybe Text
subSubId = Maybe Text
forall a. Maybe a
Nothing
                              } in
                        State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)

                     | UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     -> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged

                     | UUID
readId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     -> case StreamId t
streamId of
                          StreamName Text
_
                            -> do let
                                    Right Response
resp = Package -> Either OperationError Response
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                                    r :: FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
r = Field 2 (RequiredField (Always (Enumeration Result)))
-> FieldType
     (Field 2 (RequiredField (Always (Enumeration Result))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (RequiredField (Always (Enumeration Result)))
 -> FieldType
      (Field 2 (RequiredField (Always (Enumeration Result)))))
-> Field 2 (RequiredField (Always (Enumeration Result)))
-> FieldType
     (Field 2 (RequiredField (Always (Enumeration Result))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 2 (Enumeration Result)
ReadStream._result Response
resp
                                    es :: FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es = Repeated 1 (Message ResolvedIndexedEvent)
-> FieldType (Repeated 1 (Message ResolvedIndexedEvent))
forall a. HasField a => a -> FieldType a
getField (Repeated 1 (Message ResolvedIndexedEvent)
 -> FieldType (Repeated 1 (Message ResolvedIndexedEvent)))
-> Repeated 1 (Message ResolvedIndexedEvent)
-> FieldType (Repeated 1 (Message ResolvedIndexedEvent))
forall a b. (a -> b) -> a -> b
$ Response -> Repeated 1 (Message ResolvedIndexedEvent)
ReadStream._events Response
resp
                                    evts :: [ResolvedEvent]
evts = (ResolvedIndexedEvent -> ResolvedEvent)
-> [ResolvedIndexedEvent] -> [ResolvedEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent [ResolvedIndexedEvent]
FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es
                                    eos :: FieldType (Field 5 (RequiredField (Always (Value Bool))))
eos = Field 5 (RequiredField (Always (Value Bool)))
-> FieldType (Field 5 (RequiredField (Always (Value Bool))))
forall a. HasField a => a -> FieldType a
getField (Field 5 (RequiredField (Always (Value Bool)))
 -> FieldType (Field 5 (RequiredField (Always (Value Bool)))))
-> Field 5 (RequiredField (Always (Value Bool)))
-> FieldType (Field 5 (RequiredField (Always (Value Bool))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Bool)
ReadStream._endOfStream Response
resp
                                    nxt :: FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt = Field 3 (RequiredField (Always (Value Int64)))
-> FieldType (Field 3 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 3 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 3 (RequiredField (Always (Value Int64)))))
-> Field 3 (RequiredField (Always (Value Int64)))
-> FieldType (Field 3 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 3 (Value Int64)
ReadStream._nextNumber Response
resp
                                  case FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
r of
                                    FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
ReadStream.NO_STREAM
                                      -> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (UUID -> t -> State t
forall s. UUID -> s -> State s
Live UUID
theSubId t
pos)

                                    FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
ReadStream.SUCCESS
                                      -> do (ResolvedEvent -> IO ()) -> [ResolvedEvent] -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubAction -> IO ())
-> (ResolvedEvent -> SubAction) -> ResolvedEvent -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
                                            if Bool
FieldType (Field 5 (RequiredField (Always (Value Bool))))
eos
                                              then
                                                LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ()))
-> LoopS (State EventNumber) ()
-> IO (LoopS (State EventNumber) ())
forall a b. (a -> b) -> a -> b
$ State EventNumber -> LoopS (State EventNumber) ()
forall s a. s -> LoopS s a
LoopS (UUID -> EventNumber -> State EventNumber
forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber Int64
FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt))
                                              else
                                                do Package
newReadPkg <- Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId (Int64 -> EventNumber
rawEventNumber Int64
FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt) Int32
batch Bool
tos Maybe Credentials
cred
                                                   let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg

                                                   Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
                                                   LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ()))
-> LoopS (State EventNumber) ()
-> IO (LoopS (State EventNumber) ())
forall a b. (a -> b) -> a -> b
$ State EventNumber -> LoopS (State EventNumber) ()
forall s a. s -> LoopS s a
LoopS (UUID -> UUID -> EventNumber -> State EventNumber
forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId (Int64 -> EventNumber
rawEventNumber Int64
FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt))

                                         -- TODO - Do we have to close the subscription?
                                         -- Pretty sure the subcription has failed already at
                                         -- this point.
                                    FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)

                          StreamId t
All
                            -> do let
                                    Right Response
resp = Package -> Either OperationError Response
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                                    r :: FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
r = Field 6 (OptionalField (Last (Enumeration Result)))
-> FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
forall a. HasField a => a -> FieldType a
getField (Field 6 (OptionalField (Last (Enumeration Result)))
 -> FieldType (Field 6 (OptionalField (Last (Enumeration Result)))))
-> Field 6 (OptionalField (Last (Enumeration Result)))
-> FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
forall a b. (a -> b) -> a -> b
$ Response -> Optional 6 (Enumeration Result)
ReadAll._Result Response
resp
                                    nc_pos :: FieldType (Field 4 (RequiredField (Always (Value Int64))))
nc_pos = Field 4 (RequiredField (Always (Value Int64)))
-> FieldType (Field 4 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 4 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 4 (RequiredField (Always (Value Int64)))))
-> Field 4 (RequiredField (Always (Value Int64)))
-> FieldType (Field 4 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 4 (Value Int64)
ReadAll._NextCommitPosition Response
resp
                                    np_pos :: FieldType (Field 5 (RequiredField (Always (Value Int64))))
np_pos = Field 5 (RequiredField (Always (Value Int64)))
-> FieldType (Field 5 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 5 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 5 (RequiredField (Always (Value Int64)))))
-> Field 5 (RequiredField (Always (Value Int64)))
-> FieldType (Field 5 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Int64)
ReadAll._NextPreparePosition Response
resp
                                    es :: FieldType (Repeated 3 (Message ResolvedEventBuf))
es = Repeated 3 (Message ResolvedEventBuf)
-> FieldType (Repeated 3 (Message ResolvedEventBuf))
forall a. HasField a => a -> FieldType a
getField (Repeated 3 (Message ResolvedEventBuf)
 -> FieldType (Repeated 3 (Message ResolvedEventBuf)))
-> Repeated 3 (Message ResolvedEventBuf)
-> FieldType (Repeated 3 (Message ResolvedEventBuf))
forall a b. (a -> b) -> a -> b
$ Response -> Repeated 3 (Message ResolvedEventBuf)
ReadAll._Events Response
resp
                                    evts :: [ResolvedEvent]
evts = (ResolvedEventBuf -> ResolvedEvent)
-> [ResolvedEventBuf] -> [ResolvedEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf [ResolvedEventBuf]
FieldType (Repeated 3 (Message ResolvedEventBuf))
es
                                    eos :: Bool
eos = [ResolvedEvent] -> Bool
forall mono. MonoFoldable mono => mono -> Bool
null [ResolvedEvent]
evts
                                    n_pos :: Position
n_pos = Int64 -> Int64 -> Position
Position Int64
FieldType (Field 4 (RequiredField (Always (Value Int64))))
nc_pos Int64
FieldType (Field 5 (RequiredField (Always (Value Int64))))
np_pos

                                  case Result -> Maybe Result -> Result
forall a. a -> Maybe a -> a
fromMaybe Result
ReadAll.SUCCESS Maybe Result
FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
r of
                                    Result
ReadAll.SUCCESS
                                      -> do (ResolvedEvent -> IO ()) -> [ResolvedEvent] -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubAction -> IO ())
-> (ResolvedEvent -> SubAction) -> ResolvedEvent -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
                                            if Bool
eos
                                              then
                                                LoopS (State Position) () -> IO (LoopS (State Position) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State Position) () -> IO (LoopS (State Position) ()))
-> LoopS (State Position) () -> IO (LoopS (State Position) ())
forall a b. (a -> b) -> a -> b
$ State Position -> LoopS (State Position) ()
forall s a. s -> LoopS s a
LoopS (UUID -> Position -> State Position
forall s. UUID -> s -> State s
Live UUID
theSubId Position
n_pos)
                                              else
                                                do Package
newReadPkg <- Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId t
Position
n_pos Int32
batch Bool
tos Maybe Credentials
cred
                                                   let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg

                                                   Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
                                                   LoopS (State Position) () -> IO (LoopS (State Position) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State Position) () -> IO (LoopS (State Position) ()))
-> LoopS (State Position) () -> IO (LoopS (State Position) ())
forall a b. (a -> b) -> a -> b
$ State Position -> LoopS (State Position) ()
forall s a. s -> LoopS s a
LoopS (UUID -> UUID -> Position -> State Position
forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId Position
n_pos)

                                         -- TODO - Do we have to close the subscription?
                                         -- Pretty sure the subcription has failed already at
                                         -- this point.
                                    Result
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
                     | Bool
otherwise
                     -> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged

         unchanged :: State t
unchanged@(Live UUID
theSubId t
pos)
           -> do Either OperationError Package
outcome <- Mailbox -> IO (Either OperationError Package)
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
                 case Either OperationError Package
outcome of
                   Left OperationError
e
                     -> case OperationError
e of
                          OperationError
ConnectionHasDropped
                            -> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (t -> State t
forall s. s -> State s
Init t
pos)
                          OperationError
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)

                   Right Package
respPkg
                     | UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
                     -> let Right SubscriptionDropped
resp = Package -> Either OperationError SubscriptionDropped
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            reason :: DropReason
reason = DropReason -> Maybe DropReason -> DropReason
forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
     (Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (OptionalField (Last (Enumeration DropReason)))
 -> FieldType
      (Field 1 (OptionalField (Last (Enumeration DropReason)))))
-> Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
     (Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
                            subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
                        () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
                     | UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
                     -> let Right SubscriptionConfirmation
resp = Package -> Either OperationError SubscriptionConfirmation
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            lcp :: FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp = Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 1 (RequiredField (Always (Value Int64)))))
-> Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
                            len :: FieldType (Field 2 (OptionalField (Last (Value Int64))))
len = Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (OptionalField (Last (Value Int64)))
 -> FieldType (Field 2 (OptionalField (Last (Value Int64)))))
-> Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
                            details :: SubDetails
details =
                              SubDetails :: UUID -> Int64 -> Maybe Int64 -> Maybe Text -> SubDetails
SubDetails
                              { subId :: UUID
subId = UUID
theSubId
                              , subCommitPos :: Int64
subCommitPos = Int64
FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp
                              , subLastEventNum :: Maybe Int64
subLastEventNum = Maybe Int64
FieldType (Field 2 (OptionalField (Last (Value Int64))))
len
                              , subSubId :: Maybe Text
subSubId = Maybe Text
forall a. Maybe a
Nothing
                              } in
                        State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
                     | UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
streamEventAppearedCmd
                     -> let
                          Right StreamEventAppeared
resp = Package -> Either OperationError StreamEventAppeared
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                          evt :: ResolvedEvent
evt = ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf (ResolvedEventBuf -> ResolvedEvent)
-> ResolvedEventBuf -> ResolvedEvent
forall a b. (a -> b) -> a -> b
$ Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
     (Field 1 (RequiredField (Always (Message ResolvedEventBuf))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
 -> FieldType
      (Field 1 (RequiredField (Always (Message ResolvedEventBuf)))))
-> Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
     (Field 1 (RequiredField (Always (Message ResolvedEventBuf))))
forall a b. (a -> b) -> a -> b
$ StreamEventAppeared -> Required 1 (Message ResolvedEventBuf)
streamResolvedEvent StreamEventAppeared
resp
                          nextState :: State t
nextState =
                            case StreamId t
streamId of
                              StreamName Text
_
                                -> let nxt :: Int64
nxt = ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
evt
                                   in UUID -> EventNumber -> State EventNumber
forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber Int64
nxt)
                              StreamId t
All
                                -> let Just Position
nxtPos = ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
evt
                                   in UUID -> Position -> State Position
forall s. UUID -> s -> State s
Live UUID
theSubId Position
nxtPos in
                        State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
nextState LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
                     | Bool
otherwise
                     -> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged

       (TVar (Maybe UUID), Chan SubAction)
-> IO (TVar (Maybe UUID), Chan SubAction)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TVar (Maybe UUID)
var, Chan SubAction
subM)
  where
    batch :: Int32
batch = Int32 -> Maybe Int32 -> Int32
forall a. a -> Maybe a -> a
fromMaybe Int32
defaultBatchSize Maybe Int32
batchSiz
    stream :: Text
stream = StreamId t -> Text
forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId