{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} {-# OPTIONS_GHC -fcontext-stack=26 #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Manager.Subscription -- Copyright : (C) 2014 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore.Internal.Manager.Subscription where -------------------------------------------------------------------------------- import Control.Concurrent import Control.Exception import Control.Monad.Fix import Data.ByteString (ByteString) import Data.ByteString.Lazy (toStrict) import Data.Foldable import Data.Functor import Data.Int import qualified Data.Map.Strict as M import Data.Maybe import Data.Monoid ((<>)) import Data.Typeable import GHC.Generics (Generic) import Prelude -------------------------------------------------------------------------------- import Data.ProtocolBuffers import Data.Serialize import Data.Text hiding (group) import Data.UUID import FRP.Sodium import System.Random -------------------------------------------------------------------------------- import Database.EventStore.Internal.Operation.ReadStreamEventsOperation import Database.EventStore.Internal.TimeSpan import Database.EventStore.Internal.Types hiding (Event, newEvent) import Database.EventStore.Internal.Util.Sodium -------------------------------------------------------------------------------- data SubscribeToStream = SubscribeToStream { subscribeStreamId :: Required 1 (Value Text) , subscribeResolveLinkTos :: Required 2 (Value Bool) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Encode SubscribeToStream -------------------------------------------------------------------------------- subscribeToStream :: Text -> Bool -> SubscribeToStream subscribeToStream stream_id res_link_tos = SubscribeToStream { subscribeStreamId = putField stream_id , subscribeResolveLinkTos = putField res_link_tos } -------------------------------------------------------------------------------- data SubscriptionConfirmation = SubscriptionConfirmation { subscribeLastCommitPos :: Required 1 (Value Int64) , subscribeLastEventNumber :: Optional 2 (Value Int32) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode SubscriptionConfirmation -------------------------------------------------------------------------------- data StreamEventAppeared = StreamEventAppeared { streamResolvedEvent :: Required 1 (Message ResolvedEventBuf) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode StreamEventAppeared -------------------------------------------------------------------------------- -- | Represents the reason subscription drop happened. data DropReason = D_Unsubscribed | D_AccessDenied | D_NotFound | D_PersistentSubscriptionDeleted deriving (Enum, Eq, Show) -------------------------------------------------------------------------------- data SubscriptionDropped = SubscriptionDropped { dropReason :: Optional 1 (Enumeration DropReason) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode SubscriptionDropped -------------------------------------------------------------------------------- data UnsubscribeFromStream = UnsubscribeFromStream deriving (Generic, Show) -------------------------------------------------------------------------------- instance Encode UnsubscribeFromStream -------------------------------------------------------------------------------- data CreatePersistentSubscription = CreatePersistentSubscription { cpsGroupName :: Required 1 (Value Text) , cpsStreamId :: Required 2 (Value Text) , cpsResolveLinkTos :: Required 3 (Value Bool) , cpsStartFrom :: Required 4 (Value Int32) , cpsMsgTimeout :: Required 5 (Value Int32) , cpsRecordStats :: Required 6 (Value Bool) , cpsLiveBufSize :: Required 7 (Value Int32) , cpsReadBatchSize :: Required 8 (Value Int32) , cpsBufSize :: Required 9 (Value Int32) , cpsMaxRetryCount :: Required 10 (Value Int32) , cpsPreferRoundRobin :: Required 11 (Value Bool) , cpsChkPtAfterTime :: Required 12 (Value Int32) , cpsChkPtMaxCount :: Required 13 (Value Int32) , cpsChkPtMinCount :: Required 14 (Value Int32) , cpsSubMaxCount :: Required 15 (Value Int32) , cpsNamedConsStrategy :: Optional 16 (Value Text) } deriving (Generic, Show) -------------------------------------------------------------------------------- _createPersistentSubscription :: Text -> Text -> PersistentSubscriptionSettings -> CreatePersistentSubscription _createPersistentSubscription group stream sett = CreatePersistentSubscription { cpsGroupName = putField group , cpsStreamId = putField stream , cpsResolveLinkTos = putField $ psSettingsResolveLinkTos sett , cpsStartFrom = putField $ psSettingsStartFrom sett , cpsMsgTimeout = putField $ ms $ psSettingsMsgTimeout sett , cpsRecordStats = putField $ psSettingsExtraStats sett , cpsLiveBufSize = putField $ psSettingsLiveBufSize sett , cpsReadBatchSize = putField $ psSettingsReadBatchSize sett , cpsBufSize = putField $ psSettingsHistoryBufSize sett , cpsMaxRetryCount = putField $ psSettingsMaxRetryCount sett , cpsPreferRoundRobin = putField False , cpsChkPtAfterTime = putField $ ms $ psSettingsCheckPointAfter sett , cpsChkPtMaxCount = putField $ psSettingsMaxCheckPointCount sett , cpsChkPtMinCount = putField $ psSettingsMinCheckPointCount sett , cpsSubMaxCount = putField $ psSettingsMaxSubsCount sett , cpsNamedConsStrategy = putField $ Just strText } where strText = strategyText $ psSettingsNamedConsumerStrategy sett ms = fromIntegral . timeSpanTotalMillis -------------------------------------------------------------------------------- instance Encode CreatePersistentSubscription -------------------------------------------------------------------------------- data CreatePersistentSubscriptionResult = CPS_Success | CPS_AlreadyExists | CPS_Fail | CPS_AccessDenied deriving (Enum, Eq, Show) -------------------------------------------------------------------------------- data CreatePersistentSubscriptionCompleted = CreatePersistentSubscriptionCompleted { cpscResult :: Required 1 (Enumeration CreatePersistentSubscriptionResult) , cpscReason :: Optional 2 (Value Text) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode CreatePersistentSubscriptionCompleted -------------------------------------------------------------------------------- data DeletePersistentSubscription = DeletePersistentSubscription { dpsGroupName :: Required 1 (Value Text) , dpsStreamId :: Required 2 (Value Text) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Encode DeletePersistentSubscription -------------------------------------------------------------------------------- _deletePersistentSubscription :: Text -> Text -> DeletePersistentSubscription _deletePersistentSubscription group_name stream_id = DeletePersistentSubscription { dpsGroupName = putField group_name , dpsStreamId = putField stream_id } -------------------------------------------------------------------------------- data DeletePersistentSubscriptionResult = DPS_Success | DPS_DoesNotExist | DPS_Fail | DPS_AccessDenied deriving (Enum, Eq, Show) -------------------------------------------------------------------------------- data DeletePersistentSubscriptionCompleted = DeletePersistentSubscriptionCompleted { dpscResult :: Required 1 (Enumeration DeletePersistentSubscriptionResult) , dpscReason :: Optional 2 (Value Text) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode DeletePersistentSubscriptionCompleted -------------------------------------------------------------------------------- data UpdatePersistentSubscription = UpdatePersistentSubscription { upsGroupName :: Required 1 (Value Text) , upsStreamId :: Required 2 (Value Text) , upsResolveLinkTos :: Required 3 (Value Bool) , upsStartFrom :: Required 4 (Value Int32) , upsMsgTimeout :: Required 5 (Value Int32) , upsRecordStats :: Required 6 (Value Bool) , upsLiveBufSize :: Required 7 (Value Int32) , upsReadBatchSize :: Required 8 (Value Int32) , upsBufSize :: Required 9 (Value Int32) , upsMaxRetryCount :: Required 10 (Value Int32) , upsPreferRoundRobin :: Required 11 (Value Bool) , upsChkPtAfterTime :: Required 12 (Value Int32) , upsChkPtMaxCount :: Required 13 (Value Int32) , upsChkPtMinCount :: Required 14 (Value Int32) , upsSubMaxCount :: Required 15 (Value Int32) , upsNamedConsStrategy :: Optional 16 (Value Text) } deriving (Generic, Show) -------------------------------------------------------------------------------- _updatePersistentSubscription :: Text -> Text -> PersistentSubscriptionSettings -> UpdatePersistentSubscription _updatePersistentSubscription group stream sett = UpdatePersistentSubscription { upsGroupName = putField group , upsStreamId = putField stream , upsResolveLinkTos = putField $ psSettingsResolveLinkTos sett , upsStartFrom = putField $ psSettingsStartFrom sett , upsMsgTimeout = putField $ ms $ psSettingsMsgTimeout sett , upsRecordStats = putField $ psSettingsExtraStats sett , upsLiveBufSize = putField $ psSettingsLiveBufSize sett , upsReadBatchSize = putField $ psSettingsReadBatchSize sett , upsBufSize = putField $ psSettingsHistoryBufSize sett , upsMaxRetryCount = putField $ psSettingsMaxRetryCount sett , upsPreferRoundRobin = putField False , upsChkPtAfterTime = putField $ ms $ psSettingsCheckPointAfter sett , upsChkPtMaxCount = putField $ psSettingsMaxCheckPointCount sett , upsChkPtMinCount = putField $ psSettingsMinCheckPointCount sett , upsSubMaxCount = putField $ psSettingsMaxSubsCount sett , upsNamedConsStrategy = putField $ Just strText } where strText = strategyText $ psSettingsNamedConsumerStrategy sett ms = fromIntegral . timeSpanTotalMillis -------------------------------------------------------------------------------- instance Encode UpdatePersistentSubscription -------------------------------------------------------------------------------- data UpdatePersistentSubscriptionResult = UPS_Success | UPS_DoesNotExist | UPS_Fail | UPS_AccessDenied deriving (Enum, Eq, Show) -------------------------------------------------------------------------------- data UpdatePersistentSubscriptionCompleted = UpdatePersistentSubscriptionCompleted { upscResult :: Required 1 (Enumeration UpdatePersistentSubscriptionResult) , upscReason :: Optional 2 (Value Text) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode UpdatePersistentSubscriptionCompleted -------------------------------------------------------------------------------- data ConnectToPersistentSubscription = ConnectToPersistentSubscription { ctsId :: Required 1 (Value Text) , ctsStreamId :: Required 2 (Value Text) , ctsAllowedInFlightMsgs :: Required 3 (Value Int32) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Encode ConnectToPersistentSubscription -------------------------------------------------------------------------------- _connectToPersistentSubscription :: Text -> Text -> Int32 -> ConnectToPersistentSubscription _connectToPersistentSubscription sub_id stream_id all_fly_msgs = ConnectToPersistentSubscription { ctsId = putField sub_id , ctsStreamId = putField stream_id , ctsAllowedInFlightMsgs = putField all_fly_msgs } -------------------------------------------------------------------------------- data PersistentSubscriptionAckEvents = PersistentSubscriptionAckEvents { psaeId :: Required 1 (Value Text) , psaeProcessedEvtIds :: Required 2 (Value ByteString) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Encode PersistentSubscriptionAckEvents -------------------------------------------------------------------------------- persistentSubscriptionAckEvents :: Text -> ByteString -> PersistentSubscriptionAckEvents persistentSubscriptionAckEvents sub_id evt_ids = PersistentSubscriptionAckEvents { psaeId = putField sub_id , psaeProcessedEvtIds = putField evt_ids } -------------------------------------------------------------------------------- data NakAction = NA_Unknown | NA_Park | NA_Retry | NA_Skip | NA_Stop deriving (Enum, Eq, Show) -------------------------------------------------------------------------------- data PersistentSubscriptionNakEvents = PersistentSubscriptionNakEvents { psneId :: Required 1 (Value Text) , psneProcessedEvtIds :: Required 2 (Value ByteString) , psneMsg :: Optional 3 (Value Text) , psneAction :: Required 4 (Enumeration NakAction) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Encode PersistentSubscriptionNakEvents -------------------------------------------------------------------------------- persistentSubscriptionNakEvents :: Text -> ByteString -> Maybe Text -> NakAction -> PersistentSubscriptionNakEvents persistentSubscriptionNakEvents sub_id evt_ids msg action = PersistentSubscriptionNakEvents { psneId = putField sub_id , psneProcessedEvtIds = putField evt_ids , psneMsg = putField msg , psneAction = putField action } -------------------------------------------------------------------------------- data PersistentSubscriptionConfirmation = PersistentSubscriptionConfirmation { pscLastCommitPos :: Required 1 (Value Int64) , pscId :: Required 2 (Value Text) , pscLastEvtNumber :: Optional 3 (Value Int32) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode PersistentSubscriptionConfirmation -------------------------------------------------------------------------------- data PersistentSubscriptionStreamEventAppeared = PersistentSubscriptionStreamEventAppeared { psseaEvt :: Required 1 (Message ResolvedIndexedEvent) } deriving (Generic, Show) -------------------------------------------------------------------------------- instance Decode PersistentSubscriptionStreamEventAppeared -------------------------------------------------------------------------------- data Sub a where RegularSub :: Text -> Bool -> Sub Regular PersistentSub :: Text -> Text -> Int32 -> Sub Persistent -------------------------------------------------------------------------------- data Pending = forall s. Push s => Pending { _penId :: !UUID , _penSub :: !(Sub s) , _penCb :: Subscription s -> IO () } -------------------------------------------------------------------------------- data Confirmed = forall s. Push s => Confirmed { _conId :: !UUID , _conTyp :: !(Sub s) , _conCB :: Subscription s -> IO () , _conSub :: !(IO (Subscription s)) } -------------------------------------------------------------------------------- data OnGoing = forall s. Push s => OnGoing { _ongTyp :: !(Sub s) , _ongSub :: !(Subscription s) } -------------------------------------------------------------------------------- -- | Value's type returned when calling 'subNextEvent' type family NextEvent a :: * where NextEvent Regular = Either DropReason ResolvedEvent NextEvent Persistent = Either DropReason ResolvedEvent NextEvent Catchup = Either CatchupError ResolvedEvent -------------------------------------------------------------------------------- -- | Represents a subscription to a stream. data Subscription a = Subscription { subStreamId :: !Text -- ^ The name of the stream to which the subscription is subscribed. , subUnsubscribe :: !(IO ()) -- ^ Asynchronously unsubscribe from the the stream. , subNextEvent :: !(IO (NextEvent a)) -- ^ Awaits for the next event. , subIsSubscribedToAll :: !Bool -- ^ True if this subscription is to $all stream. , _subInternal :: !a } -------------------------------------------------------------------------------- -- | Internal use only because we all know that lawless type-classes are bad. -- But Haskell clearly lacks of a proper module system. So meanwhile, we use -- type-class as a (hacky) way to have a bit of modularity. class Push a where _pushEvt :: a -> Either DropReason ResolvedEvent -> IO () -------------------------------------------------------------------------------- _subPushEvt :: Push a => Subscription a -> Either DropReason ResolvedEvent -> IO () _subPushEvt = _pushEvt . _subInternal -------------------------------------------------------------------------------- -- | Represents a subscription that is directly identifiable. 'Regular' and -- 'Persistent' fit that description while 'Catchup' doesn't. Because -- 'Catchup' reads all events from a particular checkpoint and when it's -- finished, it issues a subscription request. class Identifiable a where _getId :: a -> UUID _getLastCommitPos :: a -> Int64 _getLastEventNumber :: a -> Maybe Int32 -------------------------------------------------------------------------------- -- | Gets the ID of the subscription. subId :: Identifiable a => Subscription a -> UUID subId = _getId . _subInternal -------------------------------------------------------------------------------- -- | The last commit position seen on the subscription (if this a subscription -- to $all stream). subLastCommitPos :: Identifiable a => Subscription a -> Int64 subLastCommitPos = _getLastCommitPos . _subInternal -------------------------------------------------------------------------------- -- | The last event number seen on the subscription (if this is a subscription -- to a single stream). subLastEventNumber :: Identifiable a => Subscription a -> Maybe Int32 subLastEventNumber = _getLastEventNumber . _subInternal -------------------------------------------------------------------------------- -- | Represents a subscription to a single stream or $all stream in the -- EventStore. data Regular = Regular { _regId :: !UUID , _regResolveLinkTos :: !Bool , _regLastCommitPos :: !Int64 , _regLastEventNumber :: !(Maybe Int32) , _regChan :: !(Chan (Either DropReason ResolvedEvent)) } -------------------------------------------------------------------------------- instance Identifiable Regular where _getId = _regId _getLastCommitPos = _regLastCommitPos _getLastEventNumber = _regLastEventNumber -------------------------------------------------------------------------------- instance Push Regular where _pushEvt reg = writeChan (_regChan reg) -------------------------------------------------------------------------------- -- | Determines whether or not any link events encontered in the stream will be -- resolved. subResolveLinkTos :: Subscription Regular -> Bool subResolveLinkTos Subscription { _subInternal = reg } = _regResolveLinkTos reg -------------------------------------------------------------------------------- -- | Errors that could arise during a catch-up subscription. 'Text' value -- represents the stream name. data CatchupError = CatchupStreamDeleted Text | CatchupUnexpectedStreamStatus Text ReadStreamResult | CatchupSubscriptionDropReason Text DropReason deriving (Show, Typeable) -------------------------------------------------------------------------------- instance Exception CatchupError -------------------------------------------------------------------------------- -- | Represents catch-up subscription. data Catchup = Catchup { _catchupSub :: MVar (Subscription Regular) } -------------------------------------------------------------------------------- -- | Waits until 'Catchup' subscription catch-up its stream. waitTillCatchup :: Subscription Catchup -> IO () waitTillCatchup Subscription { _subInternal = Catchup mvar } = do _ <- readMVar mvar return () -------------------------------------------------------------------------------- -- | Non blocking version of `waitTillCatchup`. hasCaughtUp :: Subscription Catchup -> IO Bool hasCaughtUp Subscription { _subInternal = Catchup mvar } = fmap isJust $ tryReadMVar mvar -------------------------------------------------------------------------------- -- | Represents a persistent subscription. data Persistent = Persistent { _persistId :: !UUID , _persistChan :: !(Chan (Either DropReason ResolvedEvent)) , _persistSubId :: !Text , _persistGroup :: !Text , _persistLastCPos :: !Int64 , _persistLastENum :: !(Maybe Int32) , _persistAckCmd :: AckCmd -> IO () } -------------------------------------------------------------------------------- instance Identifiable Persistent where _getId = _persistId _getLastCommitPos = _persistLastCPos _getLastEventNumber = _persistLastENum -------------------------------------------------------------------------------- instance Push Persistent where _pushEvt p = writeChan (_persistChan p) -------------------------------------------------------------------------------- -- | Acknowledges those event ids have been successfully processed. notifyEventsProcessed :: Subscription Persistent -> [UUID] -> IO () notifyEventsProcessed sub eids = _persistAckCmd p (AckCmd eids) where p = _subInternal sub -------------------------------------------------------------------------------- -- | Acknowledges those event ids have failed to be processed successfully. notifyEventsFailed :: Subscription Persistent -> NakAction -> Maybe Text -> [UUID] -> IO () notifyEventsFailed sub act msg eids = _persistAckCmd p (NakCmd act msg eids) where p = _subInternal sub -------------------------------------------------------------------------------- data PersistAction = PersistCreate PersistentSubscriptionSettings | PersistUpdate PersistentSubscriptionSettings | PersistDelete -------------------------------------------------------------------------------- data AckCmd = AckCmd [UUID] | NakCmd NakAction (Maybe Text) [UUID] -------------------------------------------------------------------------------- data PendingPersistAction = PendingPersistAction { _ppaId :: !UUID , _ppaGroup :: !Text , _ppaStream :: !Text , _ppaTyp :: !PersistAction , _ppaCB :: Either OperationException () -> IO () } -------------------------------------------------------------------------------- data PersistActionConfirmed = PersistActionConfirmed { _pacId :: !UUID , _pacResult :: !(Either OperationException ()) , _pacCB :: Either OperationException () -> IO () } -------------------------------------------------------------------------------- data Manager = Manager { _pendings :: !(M.Map UUID Pending) , _ongoings :: !(M.Map UUID OnGoing) , _pendingPersistActions :: !(M.Map UUID PendingPersistAction) } -------------------------------------------------------------------------------- initManager :: Manager initManager = Manager { _pendings = M.empty , _ongoings = M.empty , _pendingPersistActions = M.empty } -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- maybeDecodeMessage :: Decode a => ByteString -> Maybe a maybeDecodeMessage bytes = case runGet decodeMessage bytes of Right a -> Just a _ -> Nothing -------------------------------------------------------------------------------- unsafeDecodeMessage :: Decode a => ByteString -> a unsafeDecodeMessage bytes = case runGet decodeMessage bytes of Right a -> a Left e -> error $ "decoding error: " ++ e -------------------------------------------------------------------------------- data Appeared = forall s. Push s => Appeared { _appSub :: !(Subscription s) , _appEvt :: !ResolvedEvent } -------------------------------------------------------------------------------- onEventAppeared :: Package -> Manager -> Maybe Appeared onEventAppeared Package{..} Manager{..} = case M.lookup packageCorrelation _ongoings of Just (OnGoing typ sub) -> case (packageCmd, typ) of (0xC2, RegularSub _ _ ) -> let msg = unsafeDecodeMessage packageData evt = getField $ streamResolvedEvent msg in Just $ Appeared sub (newResolvedEventFromBuf evt) (0xC7, PersistentSub _ _ _) -> let msg = unsafeDecodeMessage packageData evt = getField $ psseaEvt msg in Just $ Appeared sub (newResolvedEvent evt) _ -> Nothing _ -> Nothing -------------------------------------------------------------------------------- confirmSub :: (UUID -> IO ()) -> (UUID -> Text -> AckCmd -> IO ()) -> Package -> Manager -> Maybe Confirmed confirmSub unsub ackF Package{..} Manager{..} = case M.lookup packageCorrelation _pendings of Just (Pending _ typ cb) -> case (packageCmd, typ) of (0xC1, RegularSub stream tos) -> let !msg = unsafeDecodeMessage packageData lcp = getField $ subscribeLastCommitPos msg len = getField $ subscribeLastEventNumber msg in Just $ Confirmed packageCorrelation typ cb $ do chan <- newChan let reg = Regular { _regId = packageCorrelation , _regResolveLinkTos = tos , _regLastCommitPos = lcp , _regLastEventNumber = len , _regChan = chan } return Subscription { subStreamId = stream , subUnsubscribe = unsub packageCorrelation , subNextEvent = readChan chan , subIsSubscribedToAll = stream == "" , _subInternal = reg } (0xC6, PersistentSub grp stream _) -> let !msg = unsafeDecodeMessage packageData lcp = getField $ pscLastCommitPos msg sid = getField $ pscId msg len = getField $ pscLastEvtNumber msg in Just $ Confirmed packageCorrelation typ cb $ do chan <- newChan let pes = Persistent { _persistId = packageCorrelation , _persistChan = chan , _persistSubId = sid , _persistGroup = grp , _persistLastCPos = lcp , _persistLastENum = len , _persistAckCmd = \cmd -> ackF packageCorrelation sid cmd } return Subscription { subStreamId = stream , subUnsubscribe = unsub packageCorrelation , subNextEvent = readChan chan , subIsSubscribedToAll = False , _subInternal = pes } _ -> Nothing _ -> Nothing -------------------------------------------------------------------------------- -- Events -------------------------------------------------------------------------------- data Subscribe = Subscribe { _subId :: !UUID , _subCallback :: Subscription Regular -> IO () , _subStream :: !Text , _subResolveLinkTos :: !Bool } -------------------------------------------------------------------------------- data RegisterSub = forall s. Push s => RegisterSub UUID (Sub s) (Subscription s) -------------------------------------------------------------------------------- -- Commands -------------------------------------------------------------------------------- data SubCommand = forall s. Push s => SubscribeTo (Sub s) (Subscription s -> IO ()) | SubmitPersistAction Text Text PersistAction (Either OperationException () -> IO ()) -------------------------------------------------------------------------------- subscriptionNetwork :: Settings -> (Package -> Reactive ()) -> Event Package -> Reactive (SubCommand -> IO ()) subscriptionNetwork sett push_pkg e_pkg = do -- When a subscription request has been submitted by the user. (on_sub, push_sub) <- newEvent -- When a subscription has been confirmed by EventStore and we succesfully -- create a `forall s. Push s => Subscription s` object. (on_reg_sub, push_reg_sub) <- newEvent -- When a persist action has been emitted by the user. (on_persist_action, push_persist_action) <- newEvent let push_pkg_io = pushAsync push_pkg push_ack_cmd uuid sid cmd = push_pkg_io $ createAckCmdPackage sett uuid sid cmd mgr_b <- mfix $ \mgr_b -> do let send_unsub = push_pkg_io . createUnsubscribePackage sett on_con_sub = filterJust $ snapshot (confirmSub send_unsub push_ack_cmd) e_pkg mgr_b on_drop = filterJust $ snapshot dropError e_pkg mgr_b on_persist_action_cfrm = filterJust $ snapshot onPersistActionConfirmed e_pkg mgr_b mgr_e = fmap confirmed on_reg_sub <> fmap subscribeRequest on_sub <> fmap newPersistAction on_persist_action <> fmap dropped on_drop <> fmap persistActionConfirmed on_persist_action_cfrm _ <- listen on_drop $ \(Dropped reason sub _) -> _subPushEvt sub (Left reason) _ <- listen on_persist_action_cfrm $ \(PersistActionConfirmed _ res k) -> k res _ <- listen on_con_sub $ \(Confirmed uuid typ cb action) -> do sub <- action _ <- forkIO $ sync $ push_reg_sub (RegisterSub uuid typ sub) cb sub accum initManager mgr_e let on_app = filterJust $ snapshot onEventAppeared e_pkg mgr_b runSubCommand (SubscribeTo typ cb) = do uuid <- randomIO let sub = Pending { _penId = uuid , _penSub = typ , _penCb = cb } void $ forkIO $ sync $ push_sub sub runSubCommand (SubmitPersistAction group stream typ cb) = do uuid <- randomIO let action = PendingPersistAction { _ppaId = uuid , _ppaGroup = group , _ppaStream = stream , _ppaTyp = typ , _ppaCB = cb } void $ forkIO $ sync $ push_persist_action action _ <- listen on_sub (push_pkg_io . createSubscriptionPackage sett) _ <- listen on_persist_action (push_pkg_io . createPersistActionPkg sett) _ <- listen on_app $ \(Appeared sub evt) -> _subPushEvt sub (Right evt) return runSubCommand -------------------------------------------------------------------------------- createSubscriptionPackage :: Settings -> Pending -> Package createSubscriptionPackage sett (Pending uuid typ _) = case typ of RegularSub stream tos -> createConnectRegularPackage sett uuid stream tos PersistentSub grp str bufSize -> createConnectPersistPackage sett uuid grp str bufSize -------------------------------------------------------------------------------- createConnectRegularPackage :: Settings -> UUID -> Text -> Bool -> Package createConnectRegularPackage Settings{..} uuid stream tos = Package { packageCmd = 0xC0 , packageCorrelation = uuid , packageData = runPut $ encodeMessage msg , packageCred = s_credentials } where msg = subscribeToStream stream tos -------------------------------------------------------------------------------- createConnectPersistPackage :: Settings -> UUID -> Text -> Text -> Int32 -> Package createConnectPersistPackage Settings{..} uuid group stream bufSize = Package { packageCmd = 0xC5 , packageCorrelation = uuid , packageData = runPut $ encodeMessage msg , packageCred = s_credentials } where msg = _connectToPersistentSubscription group stream bufSize -------------------------------------------------------------------------------- createAckCmdPackage :: Settings -> UUID -> Text -> AckCmd -> Package createAckCmdPackage sett uuid sid cmd = case cmd of AckCmd eids -> createAckPackage sett uuid sid eids NakCmd act msg eids -> createNakPackage sett uuid sid act msg eids -------------------------------------------------------------------------------- createAckPackage :: Settings -> UUID -> Text -> [UUID] -> Package createAckPackage Settings{..} corr sid eids = Package { packageCmd = 0xCC , packageCorrelation = corr , packageData = runPut $ encodeMessage msg , packageCred = s_credentials } where bytes = toStrict $ foldMap toByteString eids msg = persistentSubscriptionAckEvents sid bytes -------------------------------------------------------------------------------- createNakPackage :: Settings -> UUID -> Text -> NakAction -> Maybe Text -> [UUID] -> Package createNakPackage Settings{..} corr sid act txt eids = Package { packageCmd = 0xCD , packageCorrelation = corr , packageData = runPut $ encodeMessage msg , packageCred = s_credentials } where bytes = toStrict $ foldMap toByteString eids msg = persistentSubscriptionNakEvents sid bytes txt act -------------------------------------------------------------------------------- createUnsubscribePackage :: Settings -> UUID -> Package createUnsubscribePackage Settings{..} uuid = Package { packageCmd = 0xC3 , packageCorrelation = uuid , packageData = runPut $ encodeMessage UnsubscribeFromStream , packageCred = s_credentials } -------------------------------------------------------------------------------- createPersistActionPkg :: Settings -> PendingPersistAction -> Package createPersistActionPkg Settings{..} (PendingPersistAction aId grp strm typ _) = Package { packageCmd = cmd , packageCorrelation = aId , packageData = runPut msg , packageCred = s_credentials } where msg = case typ of PersistCreate sett -> encodeMessage $ _createPersistentSubscription grp strm sett PersistUpdate sett -> encodeMessage $ _updatePersistentSubscription grp strm sett PersistDelete -> encodeMessage $ _deletePersistentSubscription grp strm cmd = case typ of PersistCreate _ -> 0xC8 PersistUpdate _ -> 0xCE PersistDelete -> 0xCA -------------------------------------------------------------------------------- data Dropped = forall s. Push s => Dropped { droppedReason :: !DropReason , droppedSub :: !(Subscription s) , droppedId :: !UUID } -------------------------------------------------------------------------------- dropError :: Package -> Manager -> Maybe Dropped dropError Package{..} Manager{..} | packageCmd == 0xC4 = do OnGoing _ sub <- M.lookup packageCorrelation _ongoings msg <- maybeDecodeMessage packageData let reason = fromMaybe D_Unsubscribed $ getField $ dropReason msg return Dropped { droppedReason = reason , droppedSub = sub , droppedId = packageCorrelation } | otherwise = Nothing -------------------------------------------------------------------------------- nonEmptyText :: Text -> Maybe Text nonEmptyText "" = Nothing nonEmptyText t = Just t -------------------------------------------------------------------------------- onPersistActionConfirmed :: Package -> Manager -> Maybe PersistActionConfirmed onPersistActionConfirmed Package{..} Manager{..} = case M.lookup packageCorrelation _pendingPersistActions of Just (PendingPersistAction _ grp stream typ cb) -> case (packageCmd, typ) of (0xC9, PersistCreate _) -> do msg <- maybeDecodeMessage packageData let res = getField $ cpscResult msg reason = nonEmptyText =<< getField (cpscReason msg) ret = case res of CPS_Success -> Right () CPS_Fail -> Left $ peristentCreationFailure grp stream reason CPS_AlreadyExists -> Left $ persistentCreationExists grp stream CPS_AccessDenied -> Left $ persistentAccessDenied stream pac = PersistActionConfirmed { _pacId = packageCorrelation , _pacResult = ret , _pacCB = cb } return pac (0xCF, PersistUpdate _) -> do msg <- maybeDecodeMessage packageData let res = getField $ upscResult msg reason = nonEmptyText =<< getField (upscReason msg) ret = case res of UPS_Success -> Right () UPS_Fail -> Left $ peristentCreationFailure grp stream reason UPS_DoesNotExist -> Left $ persistentDoesNotExist grp stream UPS_AccessDenied -> Left $ persistentAccessDenied stream pac = PersistActionConfirmed { _pacId = packageCorrelation , _pacResult = ret , _pacCB = cb } return pac (0xCB, PersistDelete) -> do msg <- maybeDecodeMessage packageData let res = getField $ dpscResult msg reason = nonEmptyText =<< getField (dpscReason msg) ret = case res of DPS_Success -> Right () DPS_Fail -> Left $ peristentCreationFailure grp stream reason DPS_DoesNotExist -> Left $ persistentDoesNotExist grp stream DPS_AccessDenied -> Left $ persistentAccessDenied stream pac = PersistActionConfirmed { _pacId = packageCorrelation , _pacResult = ret , _pacCB = cb } return pac _ -> Nothing _ -> Nothing -------------------------------------------------------------------------------- -- Model -------------------------------------------------------------------------------- persistActionConfirmed :: PersistActionConfirmed -> Manager -> Manager persistActionConfirmed pc s@Manager{..} = s { _pendingPersistActions = M.delete (_pacId pc) _pendingPersistActions } -------------------------------------------------------------------------------- subscribeRequest :: Pending -> Manager -> Manager subscribeRequest p@(Pending uuid _ _) s@Manager{..} = s { _pendings = M.insert uuid p _pendings } -------------------------------------------------------------------------------- dropped :: Dropped -> Manager -> Manager dropped d s@Manager{..} = s { _ongoings = M.delete (droppedId d) _ongoings } -------------------------------------------------------------------------------- confirmed :: RegisterSub -> Manager -> Manager confirmed (RegisterSub uuid typ sub) s@Manager{..} = s { _pendings = M.delete uuid _pendings , _ongoings = M.insert uuid (OnGoing typ sub) _ongoings } -------------------------------------------------------------------------------- newPersistAction :: PendingPersistAction -> Manager -> Manager newPersistAction ppa@PendingPersistAction{..} s@Manager{..} = s { _pendingPersistActions = M.insert _ppaId ppa _pendingPersistActions } -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- peristentCreationFailure :: Text -> Text -> Maybe Text -> OperationException peristentCreationFailure group stream m_reason = InvalidOperation msg where msg = "Subscription group " <> group <> " on stream " <> stream <> " failed" <> reasonTxt reasonTxt = foldMap (" reason: " <>) m_reason -------------------------------------------------------------------------------- persistentCreationExists :: Text -> Text -> OperationException persistentCreationExists group stream = InvalidOperation msg where msg = "Subscription group " <> group <> " on stream " <> stream <> " already exists." -------------------------------------------------------------------------------- persistentDoesNotExist :: Text -> Text -> OperationException persistentDoesNotExist group stream = InvalidOperation msg where msg = "Subscription group " <> group <> " on stream " <> stream <> " doesn't exist." -------------------------------------------------------------------------------- persistentAccessDenied :: Text -> OperationException persistentAccessDenied stream = AccessDenied msg where msg = "Write access denied for stream " <> stream