{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RecordWildCards   #-}
{-# OPTIONS -Wno-orphans #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Subscription.Persistent
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Subscription.Persistent where

--------------------------------------------------------------------------------
import Data.UUID

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Persist
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
-- | The server remembers the state of the subscription. This allows for many
--   different modes of operations compared to a regular or catchup subscription
--   where the client holds the subscription state.
--   (Need EventStore >= v3.1.0).
data PersistentSubscription =
  PersistentSubscription
  { PersistentSubscription -> Exec
_perExec :: Exec
  , PersistentSubscription -> UUID
_perSubId :: UUID
  , PersistentSubscription -> StreamName
_perStream :: StreamName
  , PersistentSubscription -> Maybe Credentials
_perCred :: Maybe Credentials
  , PersistentSubscription -> TVar (Maybe Text)
_perSubKey :: TVar (Maybe Text)
  , PersistentSubscription -> Chan SubAction
_perChan :: Chan SubAction
  }

--------------------------------------------------------------------------------
instance Subscription PersistentSubscription where
  nextSubEvent :: PersistentSubscription -> IO SubAction
nextSubEvent PersistentSubscription
s = Chan SubAction -> IO SubAction
forall (m :: * -> *) a. MonadBase IO m => Chan a -> m a
readChan (PersistentSubscription -> Chan SubAction
_perChan PersistentSubscription
s)

  unsubscribe :: PersistentSubscription -> IO ()
unsubscribe PersistentSubscription
s = Exec -> SendPackage -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (PersistentSubscription -> Exec
_perExec PersistentSubscription
s) (Package -> SendPackage
SendPackage Package
pkg)
    where
      pkg :: Package
pkg = UUID -> Package
createUnsubscribePackage (PersistentSubscription -> UUID
_perSubId PersistentSubscription
s)

--------------------------------------------------------------------------------
instance SubscriptionStream PersistentSubscription EventNumber where
    subscriptionStream :: PersistentSubscription -> StreamName
subscriptionStream = PersistentSubscription -> StreamName
_perStream

--------------------------------------------------------------------------------
newPersistentSubscription
  :: Exec
  -> Text
  -> StreamName
  -> Int32
  -> Maybe Credentials
  -> IO PersistentSubscription
newPersistentSubscription :: Exec
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
newPersistentSubscription Exec
exec Text
grp (StreamName Text
stream) Int32
bufSize Maybe Credentials
cred
  = do (UUID
subId, TVar (Maybe Text)
varSubKey, Chan SubAction
chan) <- Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist Exec
exec Text
grp Text
stream Int32
bufSize Maybe Credentials
cred
       let sub :: PersistentSubscription
sub =
             PersistentSubscription :: Exec
-> UUID
-> StreamName
-> Maybe Credentials
-> TVar (Maybe Text)
-> Chan SubAction
-> PersistentSubscription
PersistentSubscription
             { _perExec :: Exec
_perExec = Exec
exec
             , _perSubId :: UUID
_perSubId = UUID
subId
             , _perCred :: Maybe Credentials
_perCred = Maybe Credentials
cred
             , _perSubKey :: TVar (Maybe Text)
_perSubKey = TVar (Maybe Text)
varSubKey
             , _perChan :: Chan SubAction
_perChan = Chan SubAction
chan
             , _perStream :: StreamName
_perStream = Text -> StreamName
StreamName Text
stream
             }

       PersistentSubscription -> IO PersistentSubscription
forall (f :: * -> *) a. Applicative f => a -> f a
pure PersistentSubscription
sub

--------------------------------------------------------------------------------
persistentGetSubKey
  :: PersistentSubscription
  -> IO Text
persistentGetSubKey :: PersistentSubscription -> IO Text
persistentGetSubKey PersistentSubscription
sub
  = STM Text -> IO Text
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Text -> IO Text) -> STM Text -> IO Text
forall a b. (a -> b) -> a -> b
$
      do Maybe Text
subKeyMay <- TVar (Maybe Text) -> STM (Maybe Text)
forall a. TVar a -> STM a
readTVar (PersistentSubscription -> TVar (Maybe Text)
_perSubKey PersistentSubscription
sub)
         case Maybe Text
subKeyMay of
           Just Text
key
             -> Text -> STM Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure Text
key
           Maybe Text
Nothing
             -> STM Text
forall a. STM a
retrySTM

--------------------------------------------------------------------------------
-- | Acknowledges those event ids have been successfully processed.
notifyEventsProcessed
  :: PersistentSubscription
  -> [UUID]
  -> IO ()
notifyEventsProcessed :: PersistentSubscription -> [UUID] -> IO ()
notifyEventsProcessed PersistentSubscription
sub [UUID]
evts
  = do Text
subKey <- PersistentSubscription -> IO Text
persistentGetSubKey PersistentSubscription
sub
       let uuid :: UUID
uuid = PersistentSubscription -> UUID
_perSubId PersistentSubscription
sub
           pkg :: Package
pkg = Maybe Credentials -> UUID -> Text -> [UUID] -> Package
createAckPackage (PersistentSubscription -> Maybe Credentials
_perCred PersistentSubscription
sub) UUID
uuid Text
subKey [UUID]
evts
       Exec -> SendPackage -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (PersistentSubscription -> Exec
_perExec PersistentSubscription
sub) (Package -> SendPackage
SendPackage Package
pkg)

--------------------------------------------------------------------------------
-- | Acknowledges that 'ResolvedEvent' has been successfully processed.
acknowledge :: PersistentSubscription -> ResolvedEvent -> IO ()
acknowledge :: PersistentSubscription -> ResolvedEvent -> IO ()
acknowledge PersistentSubscription
sub ResolvedEvent
e = PersistentSubscription -> [UUID] -> IO ()
notifyEventsProcessed PersistentSubscription
sub [ResolvedEvent -> UUID
resolvedEventOriginalId ResolvedEvent
e]

--------------------------------------------------------------------------------
-- | Acknowledges those 'ResolvedEvent's have been successfully processed.
acknowledgeEvents :: PersistentSubscription -> [ResolvedEvent] -> IO ()
acknowledgeEvents :: PersistentSubscription -> [ResolvedEvent] -> IO ()
acknowledgeEvents PersistentSubscription
sub = PersistentSubscription -> [UUID] -> IO ()
notifyEventsProcessed PersistentSubscription
sub ([UUID] -> IO ())
-> ([ResolvedEvent] -> [UUID]) -> [ResolvedEvent] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ResolvedEvent -> UUID) -> [ResolvedEvent] -> [UUID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEvent -> UUID
resolvedEventOriginalId

--------------------------------------------------------------------------------
-- | Mark a message that has failed processing. The server will take action
--   based upon the action parameter.
failed :: PersistentSubscription
       -> ResolvedEvent
       -> NakAction
       -> Maybe Text
       -> IO ()
failed :: PersistentSubscription
-> ResolvedEvent -> NakAction -> Maybe Text -> IO ()
failed PersistentSubscription
sub ResolvedEvent
e NakAction
a Maybe Text
r = PersistentSubscription
-> NakAction -> Maybe Text -> [UUID] -> IO ()
notifyEventsFailed PersistentSubscription
sub NakAction
a Maybe Text
r [ResolvedEvent -> UUID
resolvedEventOriginalId ResolvedEvent
e]

--------------------------------------------------------------------------------
-- | Mark messages that have failed processing. The server will take action
--   based upon the action parameter.
eventsFailed :: PersistentSubscription
             -> [ResolvedEvent]
             -> NakAction
             -> Maybe Text
             -> IO ()
eventsFailed :: PersistentSubscription
-> [ResolvedEvent] -> NakAction -> Maybe Text -> IO ()
eventsFailed PersistentSubscription
sub [ResolvedEvent]
evts NakAction
a Maybe Text
r =
  PersistentSubscription
-> NakAction -> Maybe Text -> [UUID] -> IO ()
notifyEventsFailed PersistentSubscription
sub NakAction
a Maybe Text
r ([UUID] -> IO ()) -> [UUID] -> IO ()
forall a b. (a -> b) -> a -> b
$ (ResolvedEvent -> UUID) -> [ResolvedEvent] -> [UUID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEvent -> UUID
resolvedEventOriginalId [ResolvedEvent]
evts

--------------------------------------------------------------------------------
-- | Acknowledges those event ids have failed to be processed successfully.
notifyEventsFailed
  :: PersistentSubscription
  -> NakAction
  -> Maybe Text -- Reason
  -> [UUID]
  -> IO ()
notifyEventsFailed :: PersistentSubscription
-> NakAction -> Maybe Text -> [UUID] -> IO ()
notifyEventsFailed PersistentSubscription
sub NakAction
act Maybe Text
res [UUID]
evts
  = do Text
subKey <- PersistentSubscription -> IO Text
persistentGetSubKey PersistentSubscription
sub
       let uuid :: UUID
uuid = PersistentSubscription -> UUID
_perSubId PersistentSubscription
sub
           pkg :: Package
pkg = Maybe Credentials
-> UUID -> Text -> NakAction -> Maybe Text -> [UUID] -> Package
createNakPackage (PersistentSubscription -> Maybe Credentials
_perCred PersistentSubscription
sub) UUID
uuid Text
subKey NakAction
act Maybe Text
res [UUID]
evts
       Exec -> SendPackage -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (PersistentSubscription -> Exec
_perExec PersistentSubscription
sub) (Package -> SendPackage
SendPackage Package
pkg)