--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation.Persist
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Operation.Persist (persist) where

--------------------------------------------------------------------------------
import Data.ProtocolBuffers
import Data.UUID

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
persist
  :: Exec
  -> Text
  -> Text
  -> Int32
  -> Maybe Credentials
  -> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist :: Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist Exec
exec Text
grp Text
stream Int32
bufSize Maybe Credentials
cred
  = do Mailbox
m <- IO Mailbox
forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
       Chan SubAction
subM <- IO (Chan SubAction)
forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
       TVar (Maybe Text)
var <- Maybe Text -> IO (TVar (Maybe Text))
forall a. a -> IO (TVar a)
newTVarIO Maybe Text
forall a. Maybe a
Nothing
       let req :: ConnectToPersistentSubscription
req = Text -> Text -> Int32 -> ConnectToPersistentSubscription
_connectToPersistentSubscription Text
grp Text
stream Int32
bufSize
       Package
pkg <- Command
-> Maybe Credentials
-> ConnectToPersistentSubscription
-> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
connectToPersistentSubscriptionCmd Maybe Credentials
cred ConnectToPersistentSubscription
req
       let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
pkg
       Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
pkg)
       Async ()
_ <- IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (IO () -> IO (Async (StM IO ())))
-> IO () -> IO (Async (StM IO ()))
forall a b. (a -> b) -> a -> b
$ IO (Loop ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (Loop a) -> m a
keepLooping (IO (Loop ()) -> IO ()) -> IO (Loop ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
         do Either OperationError Package
outcome <- Mailbox -> IO (Either OperationError Package)
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
            case Either OperationError Package
outcome of
              Left OperationError
_
                -> () -> Loop ()
forall a. a -> Loop a
Break () Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
              Right Package
respPkg
                | Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
                -> let Right SubscriptionDropped
resp = Package -> Either OperationError SubscriptionDropped
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                       reason :: DropReason
reason = DropReason -> Maybe DropReason -> DropReason
forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
     (Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (OptionalField (Last (Enumeration DropReason)))
 -> FieldType
      (Field 1 (OptionalField (Last (Enumeration DropReason)))))
-> Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
     (Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
                       subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
                   () -> Loop ()
forall a. a -> Loop a
Break () Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
                | Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
persistentSubscriptionConfirmationCmd
                -> do let Right PersistentSubscriptionConfirmation
resp = Package -> Either OperationError PersistentSubscriptionConfirmation
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                          lcp :: FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp = Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 1 (RequiredField (Always (Value Int64)))))
-> Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Required 1 (Value Int64)
pscLastCommitPos PersistentSubscriptionConfirmation
resp
                          subSubId :: FieldType (Field 2 (RequiredField (Always (Value Text))))
subSubId = Field 2 (RequiredField (Always (Value Text)))
-> FieldType (Field 2 (RequiredField (Always (Value Text))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (RequiredField (Always (Value Text)))
 -> FieldType (Field 2 (RequiredField (Always (Value Text)))))
-> Field 2 (RequiredField (Always (Value Text)))
-> FieldType (Field 2 (RequiredField (Always (Value Text))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Required 2 (Value Text)
pscId PersistentSubscriptionConfirmation
resp
                          len :: FieldType (Field 3 (OptionalField (Last (Value Int64))))
len = Field 3 (OptionalField (Last (Value Int64)))
-> FieldType (Field 3 (OptionalField (Last (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 3 (OptionalField (Last (Value Int64)))
 -> FieldType (Field 3 (OptionalField (Last (Value Int64)))))
-> Field 3 (OptionalField (Last (Value Int64)))
-> FieldType (Field 3 (OptionalField (Last (Value Int64))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Optional 3 (Value Int64)
pscLastEvtNumber PersistentSubscriptionConfirmation
resp
                          details :: SubDetails
details =
                            SubDetails :: UUID -> Int64 -> Maybe Int64 -> Maybe Text -> SubDetails
SubDetails
                            { subId :: UUID
subId = UUID
theSubId
                            , subCommitPos :: Int64
subCommitPos = Int64
FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp
                            , subLastEventNum :: Maybe Int64
subLastEventNum = Maybe Int64
FieldType (Field 3 (OptionalField (Last (Value Int64))))
len
                            , subSubId :: Maybe Text
subSubId = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
FieldType (Field 2 (RequiredField (Always (Value Text))))
subSubId
                            }
                      STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe Text) -> Maybe Text -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe Text)
var (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
FieldType (Field 2 (RequiredField (Always (Value Text))))
subSubId)
                      Loop ()
forall a. Loop a
Loop Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
                | Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
persistentSubscriptionStreamEventAppearedCmd
                -> let Right PersistentSubscriptionStreamEventAppeared
resp = Package
-> Either OperationError PersistentSubscriptionStreamEventAppeared
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                       evt :: ResolvedEvent
evt = ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent (ResolvedIndexedEvent -> ResolvedEvent)
-> ResolvedIndexedEvent -> ResolvedEvent
forall a b. (a -> b) -> a -> b
$ Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))
-> FieldType
     (Field 1 (RequiredField (Always (Message ResolvedIndexedEvent))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))
 -> FieldType
      (Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))))
-> Field 1 (RequiredField (Always (Message ResolvedIndexedEvent)))
-> FieldType
     (Field 1 (RequiredField (Always (Message ResolvedIndexedEvent))))
forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionStreamEventAppeared
-> Required 1 (Message ResolvedIndexedEvent)
psseaEvt PersistentSubscriptionStreamEventAppeared
resp in
                   Loop ()
forall a. Loop a
Loop Loop () -> IO () -> IO (Loop ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
                | Bool
otherwise
                -> Loop () -> IO (Loop ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Loop ()
forall a. Loop a
Loop

       (UUID, TVar (Maybe Text), Chan SubAction)
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID
theSubId, TVar (Maybe Text)
var, Chan SubAction
subM)