module Database.EventStore.Internal.Subscription
( Regular(..)
, Persistent(..)
, Catchup(..)
, CatchupParams(..)
, Running(..)
, Checkpoint(..)
, Subscription
, SubscriptionId
, SubscriptionClosed(..)
, SubEnv(..)
, PushCmd(..)
, AckCmd(..)
, catchupSub
, regularSub
, persistentSub
, hasCaughtUp
, getSubId
, getSubStream
, unsubscribe
, isSubscribedToAll
, getSubLastCommitPos
, getSubLastEventNumber
, nextEventMaybeSTM
, nextEvent
, nextEventMaybe
, waitConfirmation
, getSubResolveLinkTos
, waitTillCatchup
, hasCaughtUpSTM
, notifyEventsProcessed
, acknowledge
, acknowledgeEvents
, failed
, eventsFailed
, notifyEventsFailed
, unsubscribeConfirmed
, waitUnsubscribeConfirmed
, unsubscribeConfirmedSTM
) where
import Data.Int
import ClassyPrelude
import Data.Sequence (ViewL(..), viewl, dropWhileL)
import Data.UUID
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Manager.Subscription.Driver hiding (unsubscribe)
import Database.EventStore.Internal.Manager.Subscription.Model
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Types
data Regular = Regular { _subTos :: Bool }
data Catchup = Catchup
data Persistent = Persistent { _perGroup :: Text }
data Input t a where
Arrived :: ResolvedEvent -> Input t (SubStateMachine t)
ReadNext :: Input t (Maybe (ResolvedEvent, SubStateMachine t))
BatchRead :: [ResolvedEvent]
-> Bool
-> Checkpoint
-> Input Catchup (SubStateMachine Catchup)
CaughtUp :: Input Catchup Bool
LastEventNum :: Input Catchup (Int32, Maybe Position)
newtype SubStateMachine t = SubStateMachine (forall a. Input t a -> a)
type PushOp a = (Either OperationError a -> IO ()) -> Operation a -> IO ()
type PushConnect = (SubConnectEvent -> IO ()) -> PushCmd -> IO ()
data PushCmd
= PushRegular Text Bool
| PushPersistent Text Text Int32
data AckCmd = AckCmd | NakCmd NakAction (Maybe Text)
data SubEnv =
SubEnv { subSettings :: Settings
, subPushOp :: forall a. PushOp a
, subPushConnect :: PushConnect
, subPushUnsub :: Running -> IO ()
, subAckCmd :: AckCmd -> Running -> [UUID] -> IO ()
, subForceReconnect :: NodeEndPoints -> IO ()
}
data SubState t
= SubOnline (SubStateMachine t)
| SubDropped SubDropReason
| forall e. Exception e => SubException e
| SubUserUnsubscribed
modifySubSM :: (SubStateMachine a -> SubStateMachine a)
-> SubState a
-> SubState a
modifySubSM k (SubOnline sm) = SubOnline (k sm)
modifySubSM _ s = s
data CatchupParams =
CatchupParams { catchupResLnkTos :: !Bool
, catchupState :: !CatchupState
, catchupBatchSize :: !(Maybe Int32)
}
data SubLifeCycle a =
SubLifeCycle
{
onConfirm :: Running -> IO ()
, readState :: STM (SubState a)
, writeState :: SubState a -> STM ()
, onError :: SubDropReason -> IO ()
, onUserUnsubscribed :: IO ()
, retrySub :: IO ()
}
data Subscription t =
Subscription { subStream :: Text
, subLifeCycle :: SubLifeCycle t
, subEnv :: SubEnv
, subRun :: TMVar Running
, subType :: t
}
data SubscriptionClosed
= SubscriptionClosed SubDropReason
| SubscriptionUnsubscribedByUser
deriving (Show, Typeable)
instance Exception SubscriptionClosed
catchupSub :: SubEnv -> CatchupParams -> IO (Subscription Catchup)
catchupSub env params = do
mvarRun <- newEmptyTMVarIO
mvarState <- newEmptyTMVarIO
let streamId = catchupStreamName $ catchupState params
pushCmd = PushRegular streamId (catchupResLnkTos params)
lcycle =
SubLifeCycle
{ onConfirm = confirmSub mvarRun
, readState = readTMVar mvarState
, writeState = \s -> () <$ swapTMVar mvarState s
, onError = \e ->
case e of
SubAborted ->
tryRetryCatcupSubscription pushCmd env mvarState
lcycle params
SubNotHandled reason infoM ->
subNotHandledMsg env lcycle reason infoM
_ -> atomically $ do
s <- takeTMVar mvarState
case s of
SubOnline{} -> putTMVar mvarState $ SubDropped e
_ -> putTMVar mvarState s
, onUserUnsubscribed = atomically $ do
_ <- takeTMVar mvarState
putTMVar mvarState SubUserUnsubscribed
, retrySub = tryRetryCatcupSubscription pushCmd env mvarState
lcycle params
}
op = createCatchupOperation env params
subPushOp env (catchupOpEventHandler mvarState) op
subPushConnect env (subEventHandler lcycle) pushCmd
return $ Subscription streamId lcycle env mvarRun Catchup
tryRetryCatcupSubscription :: PushCmd
-> SubEnv
-> TMVar (SubState Catchup)
-> SubLifeCycle Catchup
-> CatchupParams
-> IO ()
tryRetryCatcupSubscription pushCmd env mvarState lcycle params = do
state <- atomically $ readTMVar mvarState
case state of
SubOnline sm -> do
let (num, posM) = lastEventNumSM sm
newStart =
case catchupState params of
RegularCatchup stream _ ->
RegularCatchup stream num
AllCatchup{} ->
case posM of
Just (Position npc npp) ->
AllCatchup npc npp
_ -> catchupState params
newParams = params { catchupState = newStart }
newOp = createCatchupOperation env newParams
subPushOp env (catchupOpEventHandler mvarState)
newOp
subPushConnect env (subEventHandler lcycle)
pushCmd
_ -> return ()
regularSub :: SubEnv -> Text -> Bool -> IO (Subscription Regular)
regularSub env streamId resLnkTos = do
mvarRun <- newEmptyTMVarIO
varState <- newTVarIO $ SubOnline regularSubscription
let lcycle =
SubLifeCycle
{ onConfirm = confirmSub mvarRun
, readState = readTVar varState
, writeState = writeTVar varState
, onError = \r -> atomically $ do
s <- readTVar varState
case s of
SubOnline{} -> writeTVar varState $ SubDropped r
_ -> return ()
, onUserUnsubscribed =
atomically $ writeTVar varState SubUserUnsubscribed
, retrySub = do
atomically $ do
state <- readState lcycle
case state of
SubOnline{} -> return ()
_ -> writeState lcycle $ SubOnline regularSubscription
subPushConnect env (subEventHandler lcycle)
(PushRegular streamId resLnkTos)
}
subPushConnect env (subEventHandler lcycle) (PushRegular streamId resLnkTos)
return $ Subscription streamId lcycle env mvarRun (Regular resLnkTos)
persistentSub :: SubEnv -> Text -> Text -> Int32 -> IO (Subscription Persistent)
persistentSub env grp stream bufSize = do
mvarRun <- newEmptyTMVarIO
varState <- newTVarIO $ SubOnline persistentSubscription
let lcycle =
SubLifeCycle
{ onConfirm = confirmSub mvarRun
, readState = readTVar varState
, writeState = writeTVar varState
, onError = \r -> atomically $ do
s <- readTVar varState
case s of
SubOnline{} -> writeTVar varState $ SubDropped r
_ -> return ()
, onUserUnsubscribed =
atomically $ writeTVar varState SubUserUnsubscribed
, retrySub = do
atomically $ writeState lcycle
$ SubOnline persistentSubscription
subPushConnect env (subEventHandler lcycle) pushCmd
}
pushCmd = PushPersistent grp stream bufSize
subPushConnect env (subEventHandler lcycle) pushCmd
return $ Subscription stream lcycle env mvarRun (Persistent grp)
confirmSub :: TMVar Running -> Running -> IO ()
confirmSub mvarRun r = atomically $ do
emptyVar <- isEmptyTMVar mvarRun
if emptyVar
then putTMVar mvarRun r
else () <$ swapTMVar mvarRun r
subNotHandledMsg :: SubEnv
-> SubLifeCycle s
-> NotHandledReason
-> Maybe MasterInfo
-> IO ()
subNotHandledMsg env _ N_NotMaster (Just info) =
subForceReconnect env $ masterInfoNodeEndPoints info
subNotHandledMsg _ lcycle N_NotMaster _ =
atomically $ writeState lcycle
$ SubDropped $ SubServerError (Just msg)
where
msg = "Been asked to connect to new master node \
\ but no master info been sent."
subNotHandledMsg _ lcycle N_NotReady _ = retrySub lcycle
subNotHandledMsg _ lcycle N_TooBusy _ = retrySub lcycle
createCatchupOperation :: SubEnv -> CatchupParams -> Operation CatchupOpResult
createCatchupOperation env params =
catchup (subSettings env)
(catchupState params)
(catchupResLnkTos params)
(catchupBatchSize params)
catchupOpEventHandler :: TMVar (SubState Catchup)
-> Either OperationError CatchupOpResult
-> IO ()
catchupOpEventHandler mvarState (Left e) = atomically $ do
isEmpty <- isEmptyTMVar mvarState
if isEmpty
then putTMVar mvarState (SubException e)
else () <$ swapTMVar mvarState (SubException e)
catchupOpEventHandler mvarState (Right res) = atomically $ do
whenM (isEmptyTMVar mvarState) $ do
let initState = SubOnline catchupSubscription
putTMVar mvarState initState
subState <- takeTMVar mvarState
let cmd = batchReadSM (catchupReadEvents res)
(catchupEndOfStream res)
(catchupCheckpoint res)
nxtSubState = modifySubSM cmd subState
putTMVar mvarState nxtSubState
subEventHandler :: SubLifeCycle a -> SubConnectEvent -> IO ()
subEventHandler lcycle (SubConfirmed run) = onConfirm lcycle run
subEventHandler lcycle (EventAppeared e) = atomically $ do
st <- readState lcycle
case st of
SubOnline sm ->
writeState lcycle $ SubOnline $ eventArrivedSM e sm
SubException _ ->
return ()
_ -> error "Impossible: subEventHandler"
subEventHandler lcycle Unsubscribed = onUserUnsubscribed lcycle
subEventHandler lcycle (Dropped r) = onError lcycle r
eventArrivedSM :: ResolvedEvent -> SubStateMachine t -> SubStateMachine t
eventArrivedSM e (SubStateMachine k) = k (Arrived e)
readNextSM :: SubStateMachine t -> Maybe (ResolvedEvent, SubStateMachine t)
readNextSM (SubStateMachine k) = k ReadNext
batchReadSM :: [ResolvedEvent]
-> Bool
-> Checkpoint
-> SubStateMachine Catchup
-> SubStateMachine Catchup
batchReadSM es eos nxt (SubStateMachine k) = k (BatchRead es eos nxt)
hasCaughtUpSM :: SubStateMachine Catchup -> Bool
hasCaughtUpSM (SubStateMachine k) = k CaughtUp
lastEventNumSM :: SubStateMachine Catchup -> (Int32, Maybe Position)
lastEventNumSM (SubStateMachine k) = k LastEventNum
regularSubscription :: SubStateMachine Regular
regularSubscription = baseSubStateMachine
persistentSubscription :: SubStateMachine Persistent
persistentSubscription = baseSubStateMachine
beforeChk :: Checkpoint -> ResolvedEvent -> Bool
beforeChk (CheckpointNumber num) re =
recordedEventNumber (resolvedEventOriginal re) < num
beforeChk (CheckpointPosition pos) re =
maybe False (< pos) $ resolvedEventPosition re
data CatchupSMState =
CatchupSMState { csmReadSeq :: !(Seq ResolvedEvent)
, csmLiveSeq :: !(Seq ResolvedEvent)
, csmLastNum :: !(Maybe Int32)
, csmLastPos :: !(Maybe Position)
}
initialCatchupSMState :: CatchupSMState
initialCatchupSMState = CatchupSMState empty empty Nothing Nothing
insertReadEvents :: [ResolvedEvent]
-> Checkpoint
-> CatchupSMState
-> CatchupSMState
insertReadEvents es chp s = result
where
temp = s { csmReadSeq = foldl' snoc (csmReadSeq s) es
, csmLiveSeq = dropWhileL (beforeChk chp) (csmLiveSeq s)
}
result =
case chp of
CheckpointNumber n -> temp { csmLastNum = Just n }
CheckpointPosition p -> temp { csmLastPos = Just p }
insertLiveEvent :: ResolvedEvent -> CatchupSMState -> CatchupSMState
insertLiveEvent e s = s { csmLiveSeq = csmLiveSeq s `snoc` e }
readNextFromBatchSeq :: CatchupSMState -> Maybe (ResolvedEvent, CatchupSMState)
readNextFromBatchSeq s =
case viewl $ csmReadSeq s of
EmptyL -> Nothing
e :< rest ->
let newLast = recordedEventNumber $ resolvedEventOriginal e
nxtS = s { csmReadSeq = rest
, csmLastNum = Just newLast
, csmLastPos = resolvedEventPosition e
} in
Just (e, nxtS)
readNextFromLiveSeq :: CatchupSMState -> Maybe (ResolvedEvent, CatchupSMState)
readNextFromLiveSeq s =
case viewl $ csmLiveSeq s of
EmptyL -> Nothing
e :< rest ->
let newLast = recordedEventNumber $ resolvedEventOriginal e
nxtS = s { csmLiveSeq = rest
, csmLastNum = Just newLast
, csmLastPos = resolvedEventPosition e
} in
Just (e, nxtS)
lastEventNumber :: CatchupSMState -> (Int32, Maybe Position)
lastEventNumber s = (fromMaybe 0 $ csmLastNum s, csmLastPos s)
isBatchReqEmpty :: CatchupSMState -> Bool
isBatchReqEmpty s =
case viewl $ csmReadSeq s of
EmptyL -> True
_ -> False
catchupSubscription :: SubStateMachine Catchup
catchupSubscription = SubStateMachine $ catchingUp initialCatchupSMState
where
catchingUp :: forall a. CatchupSMState -> Input Catchup a -> a
catchingUp s (Arrived e) =
SubStateMachine $ catchingUp $ insertLiveEvent e s
catchingUp s ReadNext =
let _F (e, sm) = (e, SubStateMachine $ catchingUp sm) in
fmap _F $ readNextFromBatchSeq s
catchingUp s (BatchRead es eos chk) =
let nxtS = insertReadEvents es chk s in
SubStateMachine $
if eos
then caughtUp nxtS
else catchingUp nxtS
catchingUp _ CaughtUp = False
catchingUp s LastEventNum = lastEventNumber s
caughtUp :: forall a. CatchupSMState -> Input Catchup a -> a
caughtUp s (Arrived e) = SubStateMachine $ caughtUp $ insertLiveEvent e s
caughtUp s ReadNext =
case readNextFromBatchSeq s of
Nothing -> live s ReadNext
Just (e, nxtS) ->
if isBatchReqEmpty nxtS
then Just (e, SubStateMachine $ live s)
else Just (e, SubStateMachine $ caughtUp nxtS)
caughtUp s input@BatchRead{} = catchingUp s input
caughtUp _ CaughtUp = False
caughtUp s LastEventNum = lastEventNumber s
live :: forall a. CatchupSMState -> Input Catchup a -> a
live s (Arrived e) = SubStateMachine $ live $ insertLiveEvent e s
live s ReadNext =
let _F (e, sm) = (e, SubStateMachine $ live sm) in
fmap _F $ readNextFromLiveSeq s
live s BatchRead{} = SubStateMachine $ live s
live _ CaughtUp = True
live s LastEventNum = lastEventNumber s
baseSubStateMachine :: forall t. SubStateMachine t
baseSubStateMachine = SubStateMachine $ go empty
where
go :: forall a. Seq ResolvedEvent -> Input t a -> a
go s (Arrived e) = SubStateMachine $ go (s `snoc` e)
go s ReadNext =
case viewl s of
EmptyL -> Nothing
e :< rest -> Just (e, SubStateMachine $ go rest)
go _ _ = error "impossible: base subscription"
newtype SubscriptionId = SubId UUID deriving (Eq, Ord, Show)
getSubId :: Subscription a -> IO SubscriptionId
getSubId Subscription{..} = atomically $ do
run <- readTMVar subRun
return $ SubId $ runningUUID run
getSubStream :: Subscription a -> Text
getSubStream = subStream
unsubscribe :: Subscription a -> IO ()
unsubscribe Subscription{..} = do
run <- atomically $ readTMVar subRun
subPushUnsub subEnv run
isSubscribedToAll :: Subscription a -> Bool
isSubscribedToAll = (== "") . getSubStream
getSubLastCommitPos :: Subscription a -> IO Int64
getSubLastCommitPos Subscription{..} = atomically $ do
run <- readTMVar subRun
return $ runningLastCommitPosition run
getSubLastEventNumber :: Subscription a -> IO (Maybe Int32)
getSubLastEventNumber Subscription{..} = atomically $ do
run <- readTMVar subRun
return $ runningLastEventNumber run
nextEventMaybeSTM :: Subscription a -> STM (Maybe ResolvedEvent)
nextEventMaybeSTM Subscription{..} = do
st <- readState subLifeCycle
case st of
SubException e -> throwSTM e
SubDropped r -> throwSTM $ SubscriptionClosed r
SubOnline sub -> do
case readNextSM sub of
Just (e, nxt) ->
Just e <$ writeState subLifeCycle (SubOnline nxt)
_ -> return Nothing
SubUserUnsubscribed -> throwSTM SubscriptionUnsubscribedByUser
nextEvent :: Subscription a -> IO ResolvedEvent
nextEvent sub = atomically $ do
m <- nextEventMaybeSTM sub
case m of
Nothing -> retrySTM
Just e -> return e
nextEventMaybe :: Subscription a -> IO (Maybe ResolvedEvent)
nextEventMaybe = atomically . nextEventMaybeSTM
waitConfirmation :: Subscription a -> IO ()
waitConfirmation s = atomically $ do
_ <- readTMVar $ subRun s
return ()
getSubResolveLinkTos :: Subscription Regular -> Bool
getSubResolveLinkTos = _subTos . subType
hasCaughtUp :: Subscription Catchup -> IO Bool
hasCaughtUp sub = atomically $ hasCaughtUpSTM sub
waitTillCatchup :: Subscription Catchup -> IO ()
waitTillCatchup sub = atomically $
unlessM (hasCaughtUpSTM sub) retrySTM
hasCaughtUpSTM :: Subscription Catchup -> STM Bool
hasCaughtUpSTM Subscription{..} = do
res <- readState subLifeCycle
case res of
SubOnline sm -> return $ hasCaughtUpSM sm
SubException e -> throwSTM e
SubDropped r -> throwSTM $ SubscriptionClosed r
SubUserUnsubscribed -> throwSTM SubscriptionUnsubscribedByUser
unsubscribeConfirmedSTM :: Subscription a -> STM Bool
unsubscribeConfirmedSTM Subscription{..} = do
res <- readState subLifeCycle
case res of
SubOnline _ -> return False
SubException e -> throwSTM e
SubDropped r -> throwSTM $ SubscriptionClosed r
SubUserUnsubscribed -> return True
unsubscribeConfirmed :: Subscription a -> IO Bool
unsubscribeConfirmed = atomically . unsubscribeConfirmedSTM
waitUnsubscribeConfirmed :: Subscription a -> IO ()
waitUnsubscribeConfirmed sub = atomically $
unlessM (unsubscribeConfirmedSTM sub) retrySTM
notifyEventsProcessed :: Subscription Persistent -> [UUID] -> IO ()
notifyEventsProcessed Subscription{..} evts = do
run <- atomically $ readTMVar subRun
subAckCmd subEnv AckCmd run evts
acknowledge :: Subscription Persistent -> ResolvedEvent -> IO ()
acknowledge sub e = notifyEventsProcessed sub [resolvedEventOriginalId e]
acknowledgeEvents :: Subscription Persistent -> [ResolvedEvent] -> IO ()
acknowledgeEvents sub = notifyEventsProcessed sub . fmap resolvedEventOriginalId
failed :: Subscription Persistent
-> ResolvedEvent
-> NakAction
-> Maybe Text
-> IO ()
failed sub e a r = notifyEventsFailed sub a r [resolvedEventOriginalId e]
eventsFailed :: Subscription Persistent
-> [ResolvedEvent]
-> NakAction
-> Maybe Text
-> IO ()
eventsFailed sub evts a r =
notifyEventsFailed sub a r $ fmap resolvedEventOriginalId evts
notifyEventsFailed :: Subscription Persistent
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
notifyEventsFailed Subscription{..} act res evts = do
run <- atomically $ readTMVar subRun
subAckCmd subEnv (NakCmd act res) run evts