module Database.EventStore.Internal.Operation.Persist (persist) where
import Data.ProtocolBuffers
import Data.UUID
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
persist
:: Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist :: Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist Exec
exec Text
grp Text
stream Int32
bufSize Maybe Credentials
cred
= do Mailbox
m <- IO Mailbox
forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
Chan SubAction
subM <- IO (Chan SubAction)
forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
TVar (Maybe Text)
var <- Maybe Text -> IO (TVar (Maybe Text))
forall a. a -> IO (TVar a)
newTVarIO Maybe Text
forall a. Maybe a
Nothing
let req :: ConnectToPersistentSubscription
req = Text -> Text -> Int32 -> ConnectToPersistentSubscription
_connectToPersistentSubscription Text
grp Text
stream Int32
bufSize
Package
pkg <- Command
-> Maybe Credentials
-> ConnectToPersistentSubscription
-> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
connectToPersistentSubscriptionCmd Maybe Credentials
cred ConnectToPersistentSubscription
req
let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
pkg
Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
pkg)
Async ()
_ <- IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (IO () -> IO (Async (StM IO ())))
-> IO () -> IO (Async (StM IO ()))
forall a b. (a -> b) -> a -> b
$ IO (Loop ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (Loop a) -> m a
keepLooping (IO (Loop ()) -> IO ()) -> IO (Loop ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
do Either OperationError Package
outcome <- Mailbox -> IO (Either OperationError Package)
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
_
-> () -> Loop ()
forall a. a -> Loop a
Break () Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = Package -> Either OperationError SubscriptionDropped
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = DropReason -> Maybe DropReason -> DropReason
forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason)))))
-> Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
() -> Loop ()
forall a. a -> Loop a
Break () Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
persistentSubscriptionConfirmationCmd
-> do let Right PersistentSubscriptionConfirmation
resp = Package -> Either OperationError PersistentSubscriptionConfirmation
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp = Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64)))))
-> Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Required 1 (Value Int64)
pscLastCommitPos PersistentSubscriptionConfirmation
resp
subSubId :: FieldType (Field 2 (RequiredField (Always (Value Text))))
subSubId = Field 2 (RequiredField (Always (Value Text)))
-> FieldType (Field 2 (RequiredField (Always (Value Text))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (RequiredField (Always (Value Text)))
-> FieldType (Field 2 (RequiredField (Always (Value Text)))))
-> Field 2 (RequiredField (Always (Value Text)))
-> FieldType (Field 2 (RequiredField (Always (Value Text))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Required 2 (Value Text)
pscId PersistentSubscriptionConfirmation
resp
len :: FieldType (Field 3 (OptionalField (Last (Value Int64))))
len = Field 3 (OptionalField (Last (Value Int64)))
-> FieldType (Field 3 (OptionalField (Last (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 3 (OptionalField (Last (Value Int64)))
-> FieldType (Field 3 (OptionalField (Last (Value Int64)))))
-> Field 3 (OptionalField (Last (Value Int64)))
-> FieldType (Field 3 (OptionalField (Last (Value Int64))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Optional 3 (Value Int64)
pscLastEvtNumber PersistentSubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails :: UUID -> Int64 -> Maybe Int64 -> Maybe Text -> SubDetails
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = Int64
FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = Maybe Int64
FieldType (Field 3 (OptionalField (Last (Value Int64))))
len
, subSubId :: Maybe Text
subSubId = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
FieldType (Field 2 (RequiredField (Always (Value Text))))
subSubId
}
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe Text) -> Maybe Text -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe Text)
var (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
FieldType (Field 2 (RequiredField (Always (Value Text))))
subSubId)
Loop ()
forall a. Loop a
Loop Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
persistentSubscriptionStreamEventAppearedCmd
-> let Right PersistentSubscriptionStreamEventAppeared
resp = Package
-> Either OperationError PersistentSubscriptionStreamEventAppeared
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
evt :: ResolvedEvent
evt = ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent (ResolvedIndexedEvent -> ResolvedEvent)
-> ResolvedIndexedEvent -> ResolvedEvent
forall a b. (a -> b) -> a -> b
$ Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedIndexedEvent))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))))
-> Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedIndexedEvent))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionStreamEventAppeared
-> Required 1 (Message ResolvedIndexedEvent)
psseaEvt PersistentSubscriptionStreamEventAppeared
resp in
Loop ()
forall a. Loop a
Loop Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
| Bool
otherwise
-> Loop () -> IO (Loop ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Loop ()
forall a. Loop a
Loop
(UUID, TVar (Maybe Text), Chan SubAction)
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID
theSubId, TVar (Maybe Text)
var, Chan SubAction
subM)