module Database.EventStore.Internal.Manager.Subscription where
import Control.Concurrent
import Control.Exception
import Control.Monad.Fix
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (toStrict)
import Data.Foldable
import Data.Functor
import Data.Int
import qualified Data.Map.Strict as M
import Data.Maybe
import Data.Monoid ((<>))
import Data.Typeable
import GHC.Generics (Generic)
import Prelude
import Data.ProtocolBuffers
import Data.Serialize
import Data.Text hiding (group)
import Data.UUID
import FRP.Sodium
import System.Random
import Database.EventStore.Internal.Operation.ReadStreamEventsOperation
import Database.EventStore.Internal.TimeSpan
import Database.EventStore.Internal.Types hiding (Event, newEvent)
import Database.EventStore.Internal.Util.Sodium
data SubscribeToStream
= SubscribeToStream
{ subscribeStreamId :: Required 1 (Value Text)
, subscribeResolveLinkTos :: Required 2 (Value Bool)
}
deriving (Generic, Show)
instance Encode SubscribeToStream
subscribeToStream :: Text -> Bool -> SubscribeToStream
subscribeToStream stream_id res_link_tos =
SubscribeToStream
{ subscribeStreamId = putField stream_id
, subscribeResolveLinkTos = putField res_link_tos
}
data SubscriptionConfirmation
= SubscriptionConfirmation
{ subscribeLastCommitPos :: Required 1 (Value Int64)
, subscribeLastEventNumber :: Optional 2 (Value Int32)
}
deriving (Generic, Show)
instance Decode SubscriptionConfirmation
data StreamEventAppeared
= StreamEventAppeared
{ streamResolvedEvent :: Required 1 (Message ResolvedEventBuf) }
deriving (Generic, Show)
instance Decode StreamEventAppeared
data DropReason
= D_Unsubscribed
| D_AccessDenied
| D_NotFound
| D_PersistentSubscriptionDeleted
deriving (Enum, Eq, Show)
data SubscriptionDropped
= SubscriptionDropped
{ dropReason :: Optional 1 (Enumeration DropReason) }
deriving (Generic, Show)
instance Decode SubscriptionDropped
data UnsubscribeFromStream = UnsubscribeFromStream deriving (Generic, Show)
instance Encode UnsubscribeFromStream
data CreatePersistentSubscription =
CreatePersistentSubscription
{ cpsGroupName :: Required 1 (Value Text)
, cpsStreamId :: Required 2 (Value Text)
, cpsResolveLinkTos :: Required 3 (Value Bool)
, cpsStartFrom :: Required 4 (Value Int32)
, cpsMsgTimeout :: Required 5 (Value Int32)
, cpsRecordStats :: Required 6 (Value Bool)
, cpsLiveBufSize :: Required 7 (Value Int32)
, cpsReadBatchSize :: Required 8 (Value Int32)
, cpsBufSize :: Required 9 (Value Int32)
, cpsMaxRetryCount :: Required 10 (Value Int32)
, cpsPreferRoundRobin :: Required 11 (Value Bool)
, cpsChkPtAfterTime :: Required 12 (Value Int32)
, cpsChkPtMaxCount :: Required 13 (Value Int32)
, cpsChkPtMinCount :: Required 14 (Value Int32)
, cpsSubMaxCount :: Required 15 (Value Int32)
, cpsNamedConsStrategy :: Optional 16 (Value Text)
} deriving (Generic, Show)
_createPersistentSubscription :: Text
-> Text
-> PersistentSubscriptionSettings
-> CreatePersistentSubscription
_createPersistentSubscription group stream sett =
CreatePersistentSubscription
{ cpsGroupName = putField group
, cpsStreamId = putField stream
, cpsResolveLinkTos = putField $ psSettingsResolveLinkTos sett
, cpsStartFrom = putField $ psSettingsStartFrom sett
, cpsMsgTimeout = putField $ ms $ psSettingsMsgTimeout sett
, cpsRecordStats = putField $ psSettingsExtraStats sett
, cpsLiveBufSize = putField $ psSettingsLiveBufSize sett
, cpsReadBatchSize = putField $ psSettingsReadBatchSize sett
, cpsBufSize = putField $ psSettingsHistoryBufSize sett
, cpsMaxRetryCount = putField $ psSettingsMaxRetryCount sett
, cpsPreferRoundRobin = putField False
, cpsChkPtAfterTime = putField $ ms $ psSettingsCheckPointAfter sett
, cpsChkPtMaxCount = putField $ psSettingsMaxCheckPointCount sett
, cpsChkPtMinCount = putField $ psSettingsMinCheckPointCount sett
, cpsSubMaxCount = putField $ psSettingsMaxSubsCount sett
, cpsNamedConsStrategy = putField $ Just strText
}
where
strText = strategyText $ psSettingsNamedConsumerStrategy sett
ms = fromIntegral . timeSpanTotalMillis
instance Encode CreatePersistentSubscription
data CreatePersistentSubscriptionResult
= CPS_Success
| CPS_AlreadyExists
| CPS_Fail
| CPS_AccessDenied
deriving (Enum, Eq, Show)
data CreatePersistentSubscriptionCompleted =
CreatePersistentSubscriptionCompleted
{ cpscResult :: Required 1 (Enumeration CreatePersistentSubscriptionResult)
, cpscReason :: Optional 2 (Value Text)
} deriving (Generic, Show)
instance Decode CreatePersistentSubscriptionCompleted
data DeletePersistentSubscription =
DeletePersistentSubscription
{ dpsGroupName :: Required 1 (Value Text)
, dpsStreamId :: Required 2 (Value Text)
} deriving (Generic, Show)
instance Encode DeletePersistentSubscription
_deletePersistentSubscription :: Text -> Text -> DeletePersistentSubscription
_deletePersistentSubscription group_name stream_id =
DeletePersistentSubscription
{ dpsGroupName = putField group_name
, dpsStreamId = putField stream_id
}
data DeletePersistentSubscriptionResult
= DPS_Success
| DPS_DoesNotExist
| DPS_Fail
| DPS_AccessDenied
deriving (Enum, Eq, Show)
data DeletePersistentSubscriptionCompleted =
DeletePersistentSubscriptionCompleted
{ dpscResult :: Required 1 (Enumeration DeletePersistentSubscriptionResult)
, dpscReason :: Optional 2 (Value Text)
} deriving (Generic, Show)
instance Decode DeletePersistentSubscriptionCompleted
data UpdatePersistentSubscription =
UpdatePersistentSubscription
{ upsGroupName :: Required 1 (Value Text)
, upsStreamId :: Required 2 (Value Text)
, upsResolveLinkTos :: Required 3 (Value Bool)
, upsStartFrom :: Required 4 (Value Int32)
, upsMsgTimeout :: Required 5 (Value Int32)
, upsRecordStats :: Required 6 (Value Bool)
, upsLiveBufSize :: Required 7 (Value Int32)
, upsReadBatchSize :: Required 8 (Value Int32)
, upsBufSize :: Required 9 (Value Int32)
, upsMaxRetryCount :: Required 10 (Value Int32)
, upsPreferRoundRobin :: Required 11 (Value Bool)
, upsChkPtAfterTime :: Required 12 (Value Int32)
, upsChkPtMaxCount :: Required 13 (Value Int32)
, upsChkPtMinCount :: Required 14 (Value Int32)
, upsSubMaxCount :: Required 15 (Value Int32)
, upsNamedConsStrategy :: Optional 16 (Value Text)
} deriving (Generic, Show)
_updatePersistentSubscription :: Text
-> Text
-> PersistentSubscriptionSettings
-> UpdatePersistentSubscription
_updatePersistentSubscription group stream sett =
UpdatePersistentSubscription
{ upsGroupName = putField group
, upsStreamId = putField stream
, upsResolveLinkTos = putField $ psSettingsResolveLinkTos sett
, upsStartFrom = putField $ psSettingsStartFrom sett
, upsMsgTimeout = putField $ ms $ psSettingsMsgTimeout sett
, upsRecordStats = putField $ psSettingsExtraStats sett
, upsLiveBufSize = putField $ psSettingsLiveBufSize sett
, upsReadBatchSize = putField $ psSettingsReadBatchSize sett
, upsBufSize = putField $ psSettingsHistoryBufSize sett
, upsMaxRetryCount = putField $ psSettingsMaxRetryCount sett
, upsPreferRoundRobin = putField False
, upsChkPtAfterTime = putField $ ms $ psSettingsCheckPointAfter sett
, upsChkPtMaxCount = putField $ psSettingsMaxCheckPointCount sett
, upsChkPtMinCount = putField $ psSettingsMinCheckPointCount sett
, upsSubMaxCount = putField $ psSettingsMaxSubsCount sett
, upsNamedConsStrategy = putField $ Just strText
}
where
strText = strategyText $ psSettingsNamedConsumerStrategy sett
ms = fromIntegral . timeSpanTotalMillis
instance Encode UpdatePersistentSubscription
data UpdatePersistentSubscriptionResult
= UPS_Success
| UPS_DoesNotExist
| UPS_Fail
| UPS_AccessDenied
deriving (Enum, Eq, Show)
data UpdatePersistentSubscriptionCompleted =
UpdatePersistentSubscriptionCompleted
{ upscResult :: Required 1 (Enumeration UpdatePersistentSubscriptionResult)
, upscReason :: Optional 2 (Value Text)
} deriving (Generic, Show)
instance Decode UpdatePersistentSubscriptionCompleted
data ConnectToPersistentSubscription =
ConnectToPersistentSubscription
{ ctsId :: Required 1 (Value Text)
, ctsStreamId :: Required 2 (Value Text)
, ctsAllowedInFlightMsgs :: Required 3 (Value Int32)
} deriving (Generic, Show)
instance Encode ConnectToPersistentSubscription
_connectToPersistentSubscription :: Text
-> Text
-> Int32
-> ConnectToPersistentSubscription
_connectToPersistentSubscription sub_id stream_id all_fly_msgs =
ConnectToPersistentSubscription
{ ctsId = putField sub_id
, ctsStreamId = putField stream_id
, ctsAllowedInFlightMsgs = putField all_fly_msgs
}
data PersistentSubscriptionAckEvents =
PersistentSubscriptionAckEvents
{ psaeId :: Required 1 (Value Text)
, psaeProcessedEvtIds :: Required 2 (Value ByteString)
} deriving (Generic, Show)
instance Encode PersistentSubscriptionAckEvents
persistentSubscriptionAckEvents :: Text
-> ByteString
-> PersistentSubscriptionAckEvents
persistentSubscriptionAckEvents sub_id evt_ids =
PersistentSubscriptionAckEvents
{ psaeId = putField sub_id
, psaeProcessedEvtIds = putField evt_ids
}
data NakAction
= NA_Unknown
| NA_Park
| NA_Retry
| NA_Skip
| NA_Stop
deriving (Enum, Eq, Show)
data PersistentSubscriptionNakEvents =
PersistentSubscriptionNakEvents
{ psneId :: Required 1 (Value Text)
, psneProcessedEvtIds :: Required 2 (Value ByteString)
, psneMsg :: Optional 3 (Value Text)
, psneAction :: Required 4 (Enumeration NakAction)
} deriving (Generic, Show)
instance Encode PersistentSubscriptionNakEvents
persistentSubscriptionNakEvents :: Text
-> ByteString
-> Maybe Text
-> NakAction
-> PersistentSubscriptionNakEvents
persistentSubscriptionNakEvents sub_id evt_ids msg action =
PersistentSubscriptionNakEvents
{ psneId = putField sub_id
, psneProcessedEvtIds = putField evt_ids
, psneMsg = putField msg
, psneAction = putField action
}
data PersistentSubscriptionConfirmation =
PersistentSubscriptionConfirmation
{ pscLastCommitPos :: Required 1 (Value Int64)
, pscId :: Required 2 (Value Text)
, pscLastEvtNumber :: Optional 3 (Value Int32)
} deriving (Generic, Show)
instance Decode PersistentSubscriptionConfirmation
data PersistentSubscriptionStreamEventAppeared =
PersistentSubscriptionStreamEventAppeared
{ psseaEvt :: Required 1 (Message ResolvedIndexedEvent) }
deriving (Generic, Show)
instance Decode PersistentSubscriptionStreamEventAppeared
data Sub a where
RegularSub :: Text -> Bool -> Sub Regular
PersistentSub :: Text -> Text -> Int32 -> Sub Persistent
data Pending =
forall s. Push s =>
Pending
{ _penId :: !UUID
, _penSub :: !(Sub s)
, _penCb :: Subscription s -> IO ()
}
data Confirmed =
forall s. Push s =>
Confirmed
{ _conId :: !UUID
, _conTyp :: !(Sub s)
, _conCB :: Subscription s -> IO ()
, _conSub :: !(IO (Subscription s))
}
data OnGoing =
forall s. Push s =>
OnGoing
{ _ongTyp :: !(Sub s)
, _ongSub :: !(Subscription s)
}
type family NextEvent a :: * where
NextEvent Regular = Either DropReason ResolvedEvent
NextEvent Persistent = Either DropReason ResolvedEvent
NextEvent Catchup = Either CatchupError ResolvedEvent
data Subscription a =
Subscription
{ subStreamId :: !Text
, subUnsubscribe :: !(IO ())
, subNextEvent :: !(IO (NextEvent a))
, subIsSubscribedToAll :: !Bool
, _subInternal :: !a
}
class Push a where
_pushEvt :: a -> Either DropReason ResolvedEvent -> IO ()
_subPushEvt :: Push a
=> Subscription a
-> Either DropReason ResolvedEvent
-> IO ()
_subPushEvt = _pushEvt . _subInternal
class Identifiable a where
_getId :: a -> UUID
_getLastCommitPos :: a -> Int64
_getLastEventNumber :: a -> Maybe Int32
subId :: Identifiable a => Subscription a -> UUID
subId = _getId . _subInternal
subLastCommitPos :: Identifiable a => Subscription a -> Int64
subLastCommitPos = _getLastCommitPos . _subInternal
subLastEventNumber :: Identifiable a => Subscription a -> Maybe Int32
subLastEventNumber = _getLastEventNumber . _subInternal
data Regular =
Regular
{ _regId :: !UUID
, _regResolveLinkTos :: !Bool
, _regLastCommitPos :: !Int64
, _regLastEventNumber :: !(Maybe Int32)
, _regChan :: !(Chan (Either DropReason ResolvedEvent))
}
instance Identifiable Regular where
_getId = _regId
_getLastCommitPos = _regLastCommitPos
_getLastEventNumber = _regLastEventNumber
instance Push Regular where
_pushEvt reg = writeChan (_regChan reg)
subResolveLinkTos :: Subscription Regular -> Bool
subResolveLinkTos Subscription { _subInternal = reg } = _regResolveLinkTos reg
data CatchupError
= CatchupStreamDeleted Text
| CatchupUnexpectedStreamStatus Text ReadStreamResult
| CatchupSubscriptionDropReason Text DropReason
deriving (Show, Typeable)
instance Exception CatchupError
data Catchup = Catchup { _catchupSub :: MVar (Subscription Regular) }
waitTillCatchup :: Subscription Catchup -> IO ()
waitTillCatchup Subscription { _subInternal = Catchup mvar } = do
_ <- readMVar mvar
return ()
hasCaughtUp :: Subscription Catchup -> IO Bool
hasCaughtUp Subscription { _subInternal = Catchup mvar } =
fmap isJust $ tryReadMVar mvar
data Persistent =
Persistent
{ _persistId :: !UUID
, _persistChan :: !(Chan (Either DropReason ResolvedEvent))
, _persistSubId :: !Text
, _persistGroup :: !Text
, _persistLastCPos :: !Int64
, _persistLastENum :: !(Maybe Int32)
, _persistAckCmd :: AckCmd -> IO ()
}
instance Identifiable Persistent where
_getId = _persistId
_getLastCommitPos = _persistLastCPos
_getLastEventNumber = _persistLastENum
instance Push Persistent where
_pushEvt p = writeChan (_persistChan p)
notifyEventsProcessed :: Subscription Persistent -> [UUID] -> IO ()
notifyEventsProcessed sub eids = _persistAckCmd p (AckCmd eids)
where
p = _subInternal sub
notifyEventsFailed :: Subscription Persistent
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
notifyEventsFailed sub act msg eids = _persistAckCmd p (NakCmd act msg eids)
where
p = _subInternal sub
data PersistAction
= PersistCreate PersistentSubscriptionSettings
| PersistUpdate PersistentSubscriptionSettings
| PersistDelete
data AckCmd
= AckCmd [UUID]
| NakCmd NakAction (Maybe Text) [UUID]
data PendingPersistAction =
PendingPersistAction
{ _ppaId :: !UUID
, _ppaGroup :: !Text
, _ppaStream :: !Text
, _ppaTyp :: !PersistAction
, _ppaCB :: Either OperationException () -> IO ()
}
data PersistActionConfirmed =
PersistActionConfirmed
{ _pacId :: !UUID
, _pacResult :: !(Either OperationException ())
, _pacCB :: Either OperationException () -> IO ()
}
data Manager
= Manager
{ _pendings :: !(M.Map UUID Pending)
, _ongoings :: !(M.Map UUID OnGoing)
, _pendingPersistActions :: !(M.Map UUID PendingPersistAction)
}
initManager :: Manager
initManager =
Manager
{ _pendings = M.empty
, _ongoings = M.empty
, _pendingPersistActions = M.empty
}
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage bytes =
case runGet decodeMessage bytes of
Right a -> Just a
_ -> Nothing
unsafeDecodeMessage :: Decode a => ByteString -> a
unsafeDecodeMessage bytes =
case runGet decodeMessage bytes of
Right a -> a
Left e -> error $ "decoding error: " ++ e
data Appeared =
forall s. Push s =>
Appeared
{ _appSub :: !(Subscription s)
, _appEvt :: !ResolvedEvent
}
onEventAppeared :: Package -> Manager -> Maybe Appeared
onEventAppeared Package{..} Manager{..} =
case M.lookup packageCorrelation _ongoings of
Just (OnGoing typ sub) ->
case (packageCmd, typ) of
(0xC2, RegularSub _ _ ) ->
let msg = unsafeDecodeMessage packageData
evt = getField $ streamResolvedEvent msg in
Just $ Appeared sub (newResolvedEventFromBuf evt)
(0xC7, PersistentSub _ _ _) ->
let msg = unsafeDecodeMessage packageData
evt = getField $ psseaEvt msg in
Just $ Appeared sub (newResolvedEvent evt)
_ -> Nothing
_ -> Nothing
confirmSub :: (UUID -> IO ())
-> (UUID -> Text -> AckCmd -> IO ())
-> Package
-> Manager
-> Maybe Confirmed
confirmSub unsub ackF Package{..} Manager{..} =
case M.lookup packageCorrelation _pendings of
Just (Pending _ typ cb) ->
case (packageCmd, typ) of
(0xC1, RegularSub stream tos) ->
let !msg = unsafeDecodeMessage packageData
lcp = getField $ subscribeLastCommitPos msg
len = getField $ subscribeLastEventNumber msg in
Just $ Confirmed packageCorrelation typ cb $ do
chan <- newChan
let reg = Regular
{ _regId = packageCorrelation
, _regResolveLinkTos = tos
, _regLastCommitPos = lcp
, _regLastEventNumber = len
, _regChan = chan
}
return Subscription
{ subStreamId = stream
, subUnsubscribe = unsub packageCorrelation
, subNextEvent = readChan chan
, subIsSubscribedToAll = stream == ""
, _subInternal = reg
}
(0xC6, PersistentSub grp stream _) ->
let !msg = unsafeDecodeMessage packageData
lcp = getField $ pscLastCommitPos msg
sid = getField $ pscId msg
len = getField $ pscLastEvtNumber msg in
Just $ Confirmed packageCorrelation typ cb $ do
chan <- newChan
let pes = Persistent
{ _persistId = packageCorrelation
, _persistChan = chan
, _persistSubId = sid
, _persistGroup = grp
, _persistLastCPos = lcp
, _persistLastENum = len
, _persistAckCmd = \cmd ->
ackF packageCorrelation sid cmd
}
return Subscription
{ subStreamId = stream
, subUnsubscribe = unsub packageCorrelation
, subNextEvent = readChan chan
, subIsSubscribedToAll = False
, _subInternal = pes
}
_ -> Nothing
_ -> Nothing
data Subscribe
= Subscribe
{ _subId :: !UUID
, _subCallback :: Subscription Regular -> IO ()
, _subStream :: !Text
, _subResolveLinkTos :: !Bool
}
data RegisterSub = forall s. Push s => RegisterSub UUID (Sub s) (Subscription s)
data SubCommand
= forall s. Push s => SubscribeTo (Sub s) (Subscription s -> IO ())
| SubmitPersistAction Text
Text
PersistAction
(Either OperationException () -> IO ())
subscriptionNetwork :: Settings
-> (Package -> Reactive ())
-> Event Package
-> Reactive (SubCommand -> IO ())
subscriptionNetwork sett push_pkg e_pkg = do
(on_sub, push_sub) <- newEvent
(on_reg_sub, push_reg_sub) <- newEvent
(on_persist_action, push_persist_action) <- newEvent
let push_pkg_io = pushAsync push_pkg
push_ack_cmd uuid sid cmd =
push_pkg_io $ createAckCmdPackage sett uuid sid cmd
mgr_b <- mfix $ \mgr_b -> do
let send_unsub = push_pkg_io . createUnsubscribePackage sett
on_con_sub = filterJust $ snapshot (confirmSub send_unsub
push_ack_cmd)
e_pkg mgr_b
on_drop = filterJust $ snapshot dropError e_pkg mgr_b
on_persist_action_cfrm =
filterJust $ snapshot onPersistActionConfirmed e_pkg mgr_b
mgr_e = fmap confirmed on_reg_sub <>
fmap subscribeRequest on_sub <>
fmap newPersistAction on_persist_action <>
fmap dropped on_drop <>
fmap persistActionConfirmed on_persist_action_cfrm
_ <- listen on_drop $ \(Dropped reason sub _) ->
_subPushEvt sub (Left reason)
_ <- listen on_persist_action_cfrm $ \(PersistActionConfirmed _ res k) ->
k res
_ <- listen on_con_sub $ \(Confirmed uuid typ cb action) -> do
sub <- action
_ <- forkIO $ sync $ push_reg_sub (RegisterSub uuid typ sub)
cb sub
accum initManager mgr_e
let on_app = filterJust $ snapshot onEventAppeared e_pkg mgr_b
runSubCommand (SubscribeTo typ cb) = do
uuid <- randomIO
let sub = Pending
{ _penId = uuid
, _penSub = typ
, _penCb = cb
}
void $ forkIO $ sync $ push_sub sub
runSubCommand (SubmitPersistAction group stream typ cb) = do
uuid <- randomIO
let action = PendingPersistAction
{ _ppaId = uuid
, _ppaGroup = group
, _ppaStream = stream
, _ppaTyp = typ
, _ppaCB = cb
}
void $ forkIO $ sync $ push_persist_action action
_ <- listen on_sub (push_pkg_io . createSubscriptionPackage sett)
_ <- listen on_persist_action (push_pkg_io . createPersistActionPkg sett)
_ <- listen on_app $ \(Appeared sub evt) ->
_subPushEvt sub (Right evt)
return runSubCommand
createSubscriptionPackage :: Settings -> Pending -> Package
createSubscriptionPackage sett (Pending uuid typ _) =
case typ of
RegularSub stream tos ->
createConnectRegularPackage sett uuid stream tos
PersistentSub grp str bufSize ->
createConnectPersistPackage sett uuid grp str bufSize
createConnectRegularPackage :: Settings -> UUID -> Text -> Bool -> Package
createConnectRegularPackage Settings{..} uuid stream tos =
Package
{ packageCmd = 0xC0
, packageCorrelation = uuid
, packageData = runPut $ encodeMessage msg
, packageCred = s_credentials
}
where
msg = subscribeToStream stream tos
createConnectPersistPackage :: Settings
-> UUID
-> Text
-> Text
-> Int32
-> Package
createConnectPersistPackage Settings{..} uuid group stream bufSize =
Package
{ packageCmd = 0xC5
, packageCorrelation = uuid
, packageData = runPut $ encodeMessage msg
, packageCred = s_credentials
}
where
msg = _connectToPersistentSubscription group stream bufSize
createAckCmdPackage :: Settings -> UUID -> Text -> AckCmd -> Package
createAckCmdPackage sett uuid sid cmd =
case cmd of
AckCmd eids -> createAckPackage sett uuid sid eids
NakCmd act msg eids -> createNakPackage sett uuid sid act msg eids
createAckPackage :: Settings -> UUID -> Text -> [UUID] -> Package
createAckPackage Settings{..} corr sid eids =
Package
{ packageCmd = 0xCC
, packageCorrelation = corr
, packageData = runPut $ encodeMessage msg
, packageCred = s_credentials
}
where
bytes = toStrict $ foldMap toByteString eids
msg = persistentSubscriptionAckEvents sid bytes
createNakPackage :: Settings
-> UUID
-> Text
-> NakAction
-> Maybe Text
-> [UUID]
-> Package
createNakPackage Settings{..} corr sid act txt eids =
Package
{ packageCmd = 0xCD
, packageCorrelation = corr
, packageData = runPut $ encodeMessage msg
, packageCred = s_credentials
}
where
bytes = toStrict $ foldMap toByteString eids
msg = persistentSubscriptionNakEvents sid bytes txt act
createUnsubscribePackage :: Settings -> UUID -> Package
createUnsubscribePackage Settings{..} uuid =
Package
{ packageCmd = 0xC3
, packageCorrelation = uuid
, packageData = runPut $ encodeMessage UnsubscribeFromStream
, packageCred = s_credentials
}
createPersistActionPkg :: Settings -> PendingPersistAction -> Package
createPersistActionPkg Settings{..} (PendingPersistAction aId grp strm typ _) =
Package
{ packageCmd = cmd
, packageCorrelation = aId
, packageData = runPut msg
, packageCred = s_credentials
}
where
msg =
case typ of
PersistCreate sett ->
encodeMessage $ _createPersistentSubscription grp strm sett
PersistUpdate sett ->
encodeMessage $ _updatePersistentSubscription grp strm sett
PersistDelete ->
encodeMessage $ _deletePersistentSubscription grp strm
cmd =
case typ of
PersistCreate _ -> 0xC8
PersistUpdate _ -> 0xCE
PersistDelete -> 0xCA
data Dropped =
forall s. Push s =>
Dropped
{ droppedReason :: !DropReason
, droppedSub :: !(Subscription s)
, droppedId :: !UUID
}
dropError :: Package -> Manager -> Maybe Dropped
dropError Package{..} Manager{..}
| packageCmd == 0xC4 = do
OnGoing _ sub <- M.lookup packageCorrelation _ongoings
msg <- maybeDecodeMessage packageData
let reason = fromMaybe D_Unsubscribed $ getField $ dropReason msg
return Dropped
{ droppedReason = reason
, droppedSub = sub
, droppedId = packageCorrelation
}
| otherwise = Nothing
nonEmptyText :: Text -> Maybe Text
nonEmptyText "" = Nothing
nonEmptyText t = Just t
onPersistActionConfirmed :: Package -> Manager -> Maybe PersistActionConfirmed
onPersistActionConfirmed Package{..} Manager{..} =
case M.lookup packageCorrelation _pendingPersistActions of
Just (PendingPersistAction _ grp stream typ cb) ->
case (packageCmd, typ) of
(0xC9, PersistCreate _) -> do
msg <- maybeDecodeMessage packageData
let res = getField $ cpscResult msg
reason = nonEmptyText =<< getField (cpscReason msg)
ret =
case res of
CPS_Success ->
Right ()
CPS_Fail ->
Left $ peristentCreationFailure grp
stream
reason
CPS_AlreadyExists ->
Left $ persistentCreationExists grp
stream
CPS_AccessDenied ->
Left $ persistentAccessDenied stream
pac = PersistActionConfirmed
{ _pacId = packageCorrelation
, _pacResult = ret
, _pacCB = cb
}
return pac
(0xCF, PersistUpdate _) -> do
msg <- maybeDecodeMessage packageData
let res = getField $ upscResult msg
reason = nonEmptyText =<< getField (upscReason msg)
ret =
case res of
UPS_Success ->
Right ()
UPS_Fail ->
Left $ peristentCreationFailure grp
stream
reason
UPS_DoesNotExist ->
Left $ persistentDoesNotExist grp
stream
UPS_AccessDenied ->
Left $ persistentAccessDenied stream
pac = PersistActionConfirmed
{ _pacId = packageCorrelation
, _pacResult = ret
, _pacCB = cb
}
return pac
(0xCB, PersistDelete) -> do
msg <- maybeDecodeMessage packageData
let res = getField $ dpscResult msg
reason = nonEmptyText =<< getField (dpscReason msg)
ret =
case res of
DPS_Success ->
Right ()
DPS_Fail ->
Left $ peristentCreationFailure grp
stream
reason
DPS_DoesNotExist ->
Left $ persistentDoesNotExist grp
stream
DPS_AccessDenied ->
Left $ persistentAccessDenied stream
pac = PersistActionConfirmed
{ _pacId = packageCorrelation
, _pacResult = ret
, _pacCB = cb
}
return pac
_ -> Nothing
_ -> Nothing
persistActionConfirmed :: PersistActionConfirmed -> Manager -> Manager
persistActionConfirmed pc s@Manager{..} =
s { _pendingPersistActions = M.delete (_pacId pc) _pendingPersistActions }
subscribeRequest :: Pending -> Manager -> Manager
subscribeRequest p@(Pending uuid _ _) s@Manager{..} =
s { _pendings = M.insert uuid p _pendings }
dropped :: Dropped -> Manager -> Manager
dropped d s@Manager{..} = s { _ongoings = M.delete (droppedId d) _ongoings }
confirmed :: RegisterSub -> Manager -> Manager
confirmed (RegisterSub uuid typ sub) s@Manager{..} =
s { _pendings = M.delete uuid _pendings
, _ongoings = M.insert uuid (OnGoing typ sub) _ongoings
}
newPersistAction :: PendingPersistAction -> Manager -> Manager
newPersistAction ppa@PendingPersistAction{..} s@Manager{..} =
s { _pendingPersistActions = M.insert _ppaId ppa _pendingPersistActions }
peristentCreationFailure :: Text
-> Text
-> Maybe Text
-> OperationException
peristentCreationFailure group stream m_reason = InvalidOperation msg
where
msg = "Subscription group " <> group <> " on stream " <> stream <>
" failed" <> reasonTxt
reasonTxt = foldMap (" reason: " <>) m_reason
persistentCreationExists :: Text -> Text -> OperationException
persistentCreationExists group stream = InvalidOperation msg
where
msg = "Subscription group " <> group <> " on stream " <> stream <>
" already exists."
persistentDoesNotExist :: Text -> Text -> OperationException
persistentDoesNotExist group stream = InvalidOperation msg
where
msg = "Subscription group " <> group <> " on stream " <> stream <>
" doesn't exist."
persistentAccessDenied :: Text -> OperationException
persistentAccessDenied stream = AccessDenied msg
where
msg = "Write access denied for stream " <> stream