module Database.EventStore.Internal.Operation.Volatile (volatile) where
import Data.ProtocolBuffers
import Data.UUID
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
volatile
:: Exec
-> StreamId t
-> Bool
-> Maybe Credentials
-> IO (UUID, Chan SubAction)
volatile :: Exec
-> StreamId t
-> Bool
-> Maybe Credentials
-> IO (UUID, Chan SubAction)
volatile Exec
exec StreamId t
streamId Bool
tos Maybe Credentials
cred
= do Mailbox
m <- IO Mailbox
forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
Chan SubAction
subM <- IO (Chan SubAction)
forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
let req :: SubscribeToStream
req = Text -> Bool -> SubscribeToStream
subscribeToStream Text
stream Bool
tos
Package
pkg <- Command -> Maybe Credentials -> SubscribeToStream -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
subscribeToStreamCmd Maybe Credentials
cred SubscribeToStream
req
let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
pkg
Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
pkg)
Async ()
_ <- IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (IO () -> IO (Async (StM IO ())))
-> IO () -> IO (Async (StM IO ()))
forall a b. (a -> b) -> a -> b
$ IO (Loop ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (Loop a) -> m a
keepLooping (IO (Loop ()) -> IO ()) -> IO (Loop ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
do Either OperationError Package
outcome <- Mailbox -> IO (Either OperationError Package)
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
_
-> () -> Loop ()
forall a. a -> Loop a
Break () Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = Package -> Either OperationError SubscriptionDropped
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = DropReason -> Maybe DropReason -> DropReason
forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason)))))
-> Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
() -> Loop ()
forall a. a -> Loop a
Break () Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
-> let Right SubscriptionConfirmation
resp = Package -> Either OperationError SubscriptionConfirmation
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp = Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64)))))
-> Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
len :: FieldType (Field 2 (OptionalField (Last (Value Int64))))
len = Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64)))))
-> Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails :: UUID -> Int64 -> Maybe Int64 -> Maybe Text -> SubDetails
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = Int64
FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = Maybe Int64
FieldType (Field 2 (OptionalField (Last (Value Int64))))
len
, subSubId :: Maybe Text
subSubId = Maybe Text
forall a. Maybe a
Nothing
} in
Loop ()
forall a. Loop a
Loop Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
streamEventAppearedCmd
-> let Right StreamEventAppeared
resp = Package -> Either OperationError StreamEventAppeared
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
evt :: ResolvedEvent
evt = ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf (ResolvedEventBuf -> ResolvedEvent)
-> ResolvedEventBuf -> ResolvedEvent
forall a b. (a -> b) -> a -> b
$ Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedEventBuf))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedEventBuf)))))
-> Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedEventBuf))))
forall a b. (a -> b) -> a -> b
$ StreamEventAppeared -> Required 1 (Message ResolvedEventBuf)
streamResolvedEvent StreamEventAppeared
resp in
Loop ()
forall a. Loop a
Loop Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
| Bool
otherwise
-> Loop () -> IO (Loop ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Loop ()
forall a. Loop a
Loop
(UUID, Chan SubAction) -> IO (UUID, Chan SubAction)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID
theSubId, Chan SubAction
subM)
where
stream :: Text
stream = StreamId t -> Text
forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId