{-# LANGUAGE GADTs #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS -Wno-orphans #-}
module Database.EventStore.Internal.Subscription.Catchup where
import Safe (fromJustNote)
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Types
receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready StreamName{} t
old ResolvedEvent
e =
Int64 -> EventNumber
EventNumber (ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
e) EventNumber -> EventNumber -> Bool
forall a. Ord a => a -> a -> Bool
< t
EventNumber
old
receivedAlready StreamId t
All t
old ResolvedEvent
e =
let pos :: Position
pos =
String -> Maybe Position -> Position
forall a. Partial => String -> Maybe a -> a
fromJustNote
String
"Position is always defined when reading events from $all stream"
(Maybe Position -> Position) -> Maybe Position -> Position
forall a b. (a -> b) -> a -> b
$ ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
e in
Position
pos Position -> Position -> Bool
forall a. Ord a => a -> a -> Bool
< t
Position
old
nextTarget :: StreamId t -> ResolvedEvent -> t
nextTarget :: StreamId t -> ResolvedEvent -> t
nextTarget StreamName{} ResolvedEvent
e =
Int64 -> EventNumber
EventNumber (ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
e)
nextTarget StreamId t
All ResolvedEvent
e =
String -> Maybe Position -> Position
forall a. Partial => String -> Maybe a -> a
fromJustNote
String
"Position is always defined when reading events from $all stream"
(Maybe Position -> Position) -> Maybe Position -> Position
forall a b. (a -> b) -> a -> b
$ ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
e
data CatchupSubscription t =
CatchupSubscription
{ CatchupSubscription t -> Exec
_catchupExec :: Exec
, CatchupSubscription t -> StreamId t
_catchupStream :: StreamId t
, CatchupSubscription t -> TVar (Maybe UUID)
_catchupSub :: TVar (Maybe UUID)
, CatchupSubscription t -> Chan SubAction
_catchupChan :: Chan SubAction
}
instance Subscription (CatchupSubscription s) where
nextSubEvent :: CatchupSubscription s -> IO SubAction
nextSubEvent CatchupSubscription s
s = Chan SubAction -> IO SubAction
forall (m :: * -> *) a. MonadBase IO m => Chan a -> m a
readChan (CatchupSubscription s -> Chan SubAction
forall t. CatchupSubscription t -> Chan SubAction
_catchupChan CatchupSubscription s
s)
unsubscribe :: CatchupSubscription s -> IO ()
unsubscribe CatchupSubscription s
s
= do UUID
subId <- STM UUID -> IO UUID
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM UUID -> IO UUID) -> STM UUID -> IO UUID
forall a b. (a -> b) -> a -> b
$
do Maybe UUID
idMay <- TVar (Maybe UUID) -> STM (Maybe UUID)
forall a. TVar a -> STM a
readTVar (CatchupSubscription s -> TVar (Maybe UUID)
forall t. CatchupSubscription t -> TVar (Maybe UUID)
_catchupSub CatchupSubscription s
s)
case Maybe UUID
idMay of
Maybe UUID
Nothing -> STM UUID
forall a. STM a
retrySTM
Just UUID
sid -> UUID -> STM UUID
forall (f :: * -> *) a. Applicative f => a -> f a
pure UUID
sid
let pkg :: Package
pkg = UUID -> Package
createUnsubscribePackage UUID
subId
Exec -> SendPackage -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (CatchupSubscription s -> Exec
forall t. CatchupSubscription t -> Exec
_catchupExec CatchupSubscription s
s) (Package -> SendPackage
SendPackage Package
pkg)
instance SubscriptionStream (CatchupSubscription t) t where
subscriptionStream :: CatchupSubscription t -> StreamId t
subscriptionStream = CatchupSubscription t -> StreamId t
forall t. CatchupSubscription t -> StreamId t
_catchupStream
newCatchupSubscription
:: Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription :: Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription Exec
exec Bool
tos Maybe Int32
batch Maybe Credentials
cred StreamId t
streamId t
seed
= do (TVar (Maybe UUID)
var, Chan SubAction
chan) <- Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
forall t.
Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup (Exec -> Settings
execSettings Exec
exec) Exec
exec StreamId t
streamId t
seed Bool
tos Maybe Int32
batch Maybe Credentials
cred
let sub :: CatchupSubscription t
sub =
CatchupSubscription :: forall t.
Exec
-> StreamId t
-> TVar (Maybe UUID)
-> Chan SubAction
-> CatchupSubscription t
CatchupSubscription
{ _catchupExec :: Exec
_catchupExec = Exec
exec
, _catchupStream :: StreamId t
_catchupStream = StreamId t
streamId
, _catchupSub :: TVar (Maybe UUID)
_catchupSub = TVar (Maybe UUID)
var
, _catchupChan :: Chan SubAction
_catchupChan = Chan SubAction
chan
}
CatchupSubscription t -> IO (CatchupSubscription t)
forall (f :: * -> *) a. Applicative f => a -> f a
pure CatchupSubscription t
sub