{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE Rank2Types #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Manager.Subscription.Driver -- Copyright : (C) 2015 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -- Subscription model driver. It drivers the model accordingly depending on the -- 'Package' or commands submitted to it. -------------------------------------------------------------------------------- module Database.EventStore.Internal.Manager.Subscription.Driver ( SubDropReason(..) , SubConnectEvent(..) , PersistActionException(..) , ConfirmedAction(..) , NakAction(..) , Driver , newDriver , submitPackage , connectToStream , connectToPersist , createPersist , updatePersist , deletePersist , ackPersist , nakPersist , unsubscribe , abort ) where -------------------------------------------------------------------------------- import Data.Int import Data.Maybe -------------------------------------------------------------------------------- import ClassyPrelude hiding (group) import Control.Monad.State import Data.UUID -------------------------------------------------------------------------------- import Database.EventStore.Internal.Generator import Database.EventStore.Internal.Manager.Subscription.Command import Database.EventStore.Internal.Manager.Subscription.Message import Database.EventStore.Internal.Manager.Subscription.Model import Database.EventStore.Internal.Manager.Subscription.Packages import Database.EventStore.Internal.Manager.Subscription.Types import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- -- | Set of events that can occurs during a subscription lifetime. data SubConnectEvent = EventAppeared ResolvedEvent -- ^ A wild event appeared ! | Dropped SubDropReason -- ^ The subscription connection dropped. | SubConfirmed Running -- ^ Subscription connection is confirmed. It means that subscription can -- receive events from the server. | Unsubscribed -------------------------------------------------------------------------------- -- | Enumerates all persistent action exceptions. data PersistActionException = PersistActionFail -- ^ The action failed. | PersistActionAlreadyExist -- ^ Happens when creating a persistent subscription on a stream with a -- group name already taken. | PersistActionDoesNotExist -- ^ An operation tried to do something on a persistent subscription or a -- stream that don't exist. | PersistActionAccessDenied -- ^ The current user is not allowed to operate on the supplied stream or -- persistent subscription. | PersistActionAborted -- ^ That action has been aborted because the user shutdown the connection -- to the server or the connection to the server is no longer possible. deriving (Show, Typeable) -------------------------------------------------------------------------------- instance Exception PersistActionException -------------------------------------------------------------------------------- -- | Emitted when a persistent action has been carried out successfully. data ConfirmedAction = ConfirmedAction { caId :: !UUID -- ^ Action id. , caGroup :: !Text -- ^ Subscription group name. , caStream :: !Text -- ^ Stream name. , caAction :: !PersistAction -- ^ Persistent action type. } -------------------------------------------------------------------------------- -- | Submits a 'Package' to a subscription driver. If the 'Package' was -- processed by the driver, it will return a final value and a new driver with -- its internal state updated accordingly. submitPackage :: Package -> Driver r -> Maybe (r, Driver r) submitPackage pkg (Driver k) = k (Pkg pkg) -------------------------------------------------------------------------------- -- | Starts a regular subscription connection. It returns the associated -- 'Package' and updates driver internal state. connectToStream :: (SubConnectEvent -> r) -> Text -- ^ Stream name. -> Bool -- ^ Resolve Link TOS -> Driver r -> (Package, Driver r) connectToStream c s t (Driver k) = k (Cmd $ ConnectReg c s t) -------------------------------------------------------------------------------- -- | Starts a persistent subscription connection. It returns the associated -- 'Package' and updates driver internal state. connectToPersist :: (SubConnectEvent -> r) -> Text -- ^ Group name. -> Text -- ^ Stream name. -> Int32 -- ^ Buffer size. -> Driver r -> (Package, Driver r) connectToPersist c g s b (Driver k) = k (Cmd $ ConnectPersist c g s b) -------------------------------------------------------------------------------- -- | Creates a persistent subscription. It returns the associated 'Package' and -- updates driver internal state. createPersist :: (Either PersistActionException ConfirmedAction -> r) -> Text -- ^ Group name. -> Text -- ^ Stream name. -> PersistentSubscriptionSettings -> Driver r -> (Package, Driver r) createPersist c g s ss (Driver k) = k (Cmd $ ApplyPersistAction c g s (PersistCreate ss)) -------------------------------------------------------------------------------- -- | Updates a persistent subscription. It returns the associated 'Package' and -- updates driver internal state. updatePersist :: (Either PersistActionException ConfirmedAction -> r) -> Text -- ^ Group name. -> Text -- ^ Stream name. -> PersistentSubscriptionSettings -> Driver r -> (Package, Driver r) updatePersist c g s ss (Driver k) = k (Cmd $ ApplyPersistAction c g s (PersistUpdate ss)) -------------------------------------------------------------------------------- -- | Deletes a persistent subscription. It returns the associated 'Package' and -- updates driver internal state. deletePersist :: (Either PersistActionException ConfirmedAction -> r) -> Text -- ^ Group name. -> Text -- ^ Stream name. -> Driver r -> (Package, Driver r) deletePersist c g s (Driver k) = k (Cmd $ ApplyPersistAction c g s PersistDelete) -------------------------------------------------------------------------------- -- | Given a persistent subscription, acknowledges a set of events have been -- successfully processed. It returns the associated 'Package' and updates -- driver internal state. ackPersist :: r -> Running -> [UUID] -- ^ Event ids. -> Driver r -> (Package, Driver r) ackPersist r i evts (Driver k) = k (Cmd $ PersistAck r i evts) -------------------------------------------------------------------------------- -- | Given a persistent subscription, indicates a set of event haven't been -- processed correctly. It returns the associated 'Package' and updates driver -- internal state. nakPersist :: r -> Running -> NakAction -> Maybe Text -- ^ Reason. -> [UUID] -- ^ Event ids. -> Driver r -> (Package, Driver r) nakPersist r i na mt evts (Driver k) = k (Cmd $ PersistNak r i na mt evts) -------------------------------------------------------------------------------- -- | Unsubscribe from a subscription. unsubscribe :: Running -> Driver r -> (Package, Driver r) unsubscribe r (Driver k) = k (Cmd $ Unsubscribe r) -------------------------------------------------------------------------------- -- | Aborts every pending action. abort :: Driver r -> [r] abort (Driver k) = k Abort -------------------------------------------------------------------------------- -- EventStore result mappers: -- ========================= -- EventStore protocol has several values that means the exact same thing. Those -- functions convert a specific EventStore to uniform result type common to all -- persistent actions. -------------------------------------------------------------------------------- createRException :: CreatePersistentSubscriptionResult -> Maybe PersistActionException createRException CPS_Success = Nothing createRException CPS_AlreadyExists = Just PersistActionAlreadyExist createRException CPS_Fail = Just PersistActionFail createRException CPS_AccessDenied = Just PersistActionAccessDenied -------------------------------------------------------------------------------- deleteRException :: DeletePersistentSubscriptionResult -> Maybe PersistActionException deleteRException DPS_Success = Nothing deleteRException DPS_DoesNotExist = Just PersistActionDoesNotExist deleteRException DPS_Fail = Just PersistActionFail deleteRException DPS_AccessDenied = Just PersistActionAccessDenied -------------------------------------------------------------------------------- updateRException :: UpdatePersistentSubscriptionResult -> Maybe PersistActionException updateRException UPS_Success = Nothing updateRException UPS_DoesNotExist = Just PersistActionDoesNotExist updateRException UPS_Fail = Just PersistActionFail updateRException UPS_AccessDenied = Just PersistActionAccessDenied -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- -- | Type of inputs handled by the 'Subscription' driver. data In r a where -- A command consists of receiving some parameters, updating the -- 'Subscription' model accordingly and thus modifying driver internal -- state. Cmd :: Cmd r -> In r (Package, Driver r) -- A 'Package' has been submitted to the 'Subscription' driver. If the -- driver recognize that 'Package', it returns a final value and update -- the driver internal state. Pkg :: Package -> In r (Maybe (r, Driver r)) -- Aborts every pending action. Abort :: In r [r] -------------------------------------------------------------------------------- -- | Set of commands handled by the driver. data Cmd r = ConnectReg (SubConnectEvent -> r) Text Bool -- ^ Creates a regular 'Subscription' connection. When a 'SubConnectEvent' -- has arrived, the driver will use the provided callback and emit a -- final value. It holds a stream name and `Resolve Link TOS` setting. | ConnectPersist (SubConnectEvent -> r) Text Text Int32 -- ^ Creates a persistent 'Subscription' connection. When a -- 'SubConnectEvent' has arrived, the driver will use the provided -- callback and emit a final value. It holds a group name, stream -- name and a buffer size. | Unsubscribe Running -- ^ Unsubscribe from a subscription. | ApplyPersistAction (Either PersistActionException ConfirmedAction -> r) Text Text PersistAction -- ^ Creates a persistent action. Depending of the failure or the success -- of that action, the driver will use the provided callback to emit a -- final value. It hols a group name, a stream name and a persistent -- action. | PersistAck r Running [UUID] -- ^ Acks a set of Event 'UUID' to notify those events have been correctly -- handled. It holds a 'Running' subscription and a set of `UUID` -- representing event id. When the ack would be confirmed the driver -- will return the supplied final value. | PersistNak r Running NakAction (Maybe Text) [UUID] -- ^ Naks a set of Event 'UUID' to notify those events haven't been -- handled correctly. it holds a 'Running' subscription, a 'NakAction', -- an optional reason and a set of event ids. When the nak would be -- confirmed, the driver will return the provided final value. -------------------------------------------------------------------------------- cmdSubCallback :: Cmd r -> Maybe (SubConnectEvent -> r) cmdSubCallback (ConnectReg k _ _) = Just k cmdSubCallback (ConnectPersist k _ _ _) = Just k cmdSubCallback _ = Nothing -------------------------------------------------------------------------------- -- | Driver internal state. data Internal r = Internal { _model :: !Model -- ^ Subscription model. , _gen :: !Generator -- ^ 'UUID' generator. , _reg :: !(HashMap UUID (Cmd r)) -- ^ Holds ongoing commands. When stored, it means an action hasn't been -- confirmed yet. } -------------------------------------------------------------------------------- initInternal :: Generator -> Internal r initInternal gen = Internal newModel gen mempty -------------------------------------------------------------------------------- -- | Subscription driver state machine. newtype Driver r = Driver (forall a. In r a -> a) -------------------------------------------------------------------------------- newtype DriverM r m a = DriverM (ReaderT Settings (StateT (Internal r) m) a) deriving ( Functor , Applicative , Monad , MonadReader Settings , MonadState (Internal r) ) -------------------------------------------------------------------------------- instance MonadTrans (DriverM r) where lift m = DriverM $ lift $ lift m -------------------------------------------------------------------------------- noop :: DriverM r Maybe a noop = lift Nothing -------------------------------------------------------------------------------- modelSubRunning :: UUID -> DriverM r Maybe Running modelSubRunning uuid = do model <- gets _model lift $ querySubscription uuid $ model -------------------------------------------------------------------------------- modelSubConfirmed :: Monad m => UUID -> Meta -> DriverM r m () modelSubConfirmed uuid meta = do model <- gets _model let nxt = confirmedSubscription uuid meta model modify $ \s -> s { _model = nxt } -------------------------------------------------------------------------------- modelActionConfirmed :: Monad m => UUID -> DriverM r m () modelActionConfirmed uuid = modify $ \s -> s { _model = confirmedAction uuid $ _model s } -------------------------------------------------------------------------------- modelUnsubscribed :: UUID -> DriverM r Maybe () modelUnsubscribed uuid = do run <- modelSubRunning uuid model <- gets _model modify $ \s -> s { _model = unsubscribed run model } -------------------------------------------------------------------------------- registerDelete :: Monad m => UUID -> DriverM r m () registerDelete uuid = do reg <- gets _reg let nxtR = deleteMap uuid reg modify $ \s -> s { _reg = nxtR } -------------------------------------------------------------------------------- registerAdd :: Monad m => UUID -> Cmd r -> DriverM r m () registerAdd uuid cmd = do reg <- gets _reg modify $ \s -> s { _reg = insertMap uuid cmd reg } -------------------------------------------------------------------------------- modelPersistentAction :: UUID -> DriverM r Maybe PendingAction modelPersistentAction uuid = do model <- gets _model lift $ queryPersistentAction uuid model -------------------------------------------------------------------------------- freshUUID :: Monad m => DriverM r m UUID freshUUID = do (uuid, nxtG) <- gets (nextUUID . _gen) modify $ \s -> s { _gen = nxtG } return uuid -------------------------------------------------------------------------------- modelConnectReg :: Monad m => Text -> Bool -> DriverM r m UUID modelConnectReg stream tos = do uuid <- freshUUID model <- gets _model modify $ \s -> s { _model = connectReg stream tos uuid model } return uuid -------------------------------------------------------------------------------- modelConnectPersist :: Monad m => Text -> Text -> Int32 -> DriverM r m UUID modelConnectPersist group name batch = do uuid <- freshUUID model <- gets _model modify $ \s -> s { _model = connectPersist group name batch uuid model } return uuid -------------------------------------------------------------------------------- modelPersistAction :: Monad m => Text -> Text -> PersistAction -> DriverM r m UUID modelPersistAction group name action = do uuid <- freshUUID model <- gets _model modify $ \s -> s { _model = persistAction group name uuid action model } return uuid -------------------------------------------------------------------------------- runDriverM :: Monad m => Settings -> Internal r -> DriverM r m a -> m (a, Driver r) runDriverM setts st (DriverM m) = do (a, nxtSt) <- runStateT (runReaderT m setts) st return (a, Driver $ handleDriver setts nxtSt) -------------------------------------------------------------------------------- runDriver :: Settings -> Internal r -> DriverM r Identity a -> (a, Driver r) runDriver setts st action = runIdentity $ runDriverM setts st action -------------------------------------------------------------------------------- -- Driver main state machine. handleDriver :: Settings -> Internal r -> In r a -> a handleDriver setts st (Pkg pkg) = do let corrId = packageCorrelation pkg cmd <- lookup corrId $ _reg st let action = handleMsg corrId cmd $ decodeServerMessage pkg runDriverM setts st action handleDriver setts st (Cmd cmd) = let action = handleCmd cmd in runDriver setts st action handleDriver _ st Abort = (fmap snd $ mapToList $ _reg st) >>= _F where _F (ConnectReg k _ _) = [k $ Dropped SubAborted] _F (ConnectPersist k _ _ _) = [k $ Dropped SubAborted] _F (ApplyPersistAction k _ _ _) = [k $ Left PersistActionAborted] _F _ = [] -------------------------------------------------------------------------------- -- | Handles 'Package's coming from the server. handleMsg :: UUID -> Cmd r -> ServerMessage -> DriverM r Maybe r handleMsg corrId = go where go (ConnectReg k _ _) (EventAppearedMsg evt) = do _ <- modelSubRunning corrId return $ k $ EventAppeared evt go (ConnectPersist k _ _ _) (PersistentEventAppearedMsg evt) = do _ <- modelSubRunning corrId return $ k $ EventAppeared evt go (ConnectReg k _ _) (ConfirmationMsg lcp len) = do let meta = RegularMeta lcp len modelSubConfirmed corrId meta run <- modelSubRunning corrId return $ k $ SubConfirmed run go (ConnectPersist k _ _ _) (PersistentConfirmationMsg sid lcp len) = do let meta = PersistMeta sid lcp len modelSubConfirmed corrId meta run <- modelSubRunning corrId return $ k $ SubConfirmed run go cmd (PersistentCreatedMsg res) = confirmPAction cmd $ createRException res go cmd (PersistentUpdatedMsg res) = confirmPAction cmd $ updateRException res go cmd (PersistentDeletedMsg res) = confirmPAction cmd $ deleteRException res go cmd (DroppedMsg reason) = do modelUnsubscribed corrId registerDelete corrId let evt = case reason of SubUnsubscribed -> Unsubscribed _ -> Dropped reason k <- lift $ cmdSubCallback cmd return $ k evt go cmd (BadRequestMsg msg) = go cmd (DroppedMsg $ SubServerError msg) go cmd (NotAuthenticatedMsg msg) = go cmd (DroppedMsg $ SubNotAuthenticated msg) go cmd (NotHandledMsg reason info) = go cmd (DroppedMsg $ SubNotHandled reason info) go cmd (UnknownMsg pkgCmdM) = do k <- lift $ cmdSubCallback cmd let msgM = fmap (\c -> "unknown command: " <> tshow c) pkgCmdM return $ k $ Dropped $ SubServerError msgM go cmd _ = do k <- lift $ cmdSubCallback cmd let msg = "Logic error in Subscription Driver (the impossible happened)" return $ k $ Dropped $ SubClientError msg confirmPAction :: Cmd r -> Maybe PersistActionException -> DriverM r Maybe r confirmPAction (ApplyPersistAction k g n c) em = do _ <- modelPersistentAction corrId modelActionConfirmed corrId registerDelete corrId let evt = ConfirmedAction corrId g n c case em of Just e -> return $ k $ Left e Nothing -> return $ k $ Right evt confirmPAction _ _ = noop -------------------------------------------------------------------------------- -- | Handles commands coming from the user. handleCmd :: Monad m => Cmd r -> DriverM r m Package handleCmd cmd@(ConnectReg _ s tos) = do setts <- ask uuid <- modelConnectReg s tos registerAdd uuid cmd return $ createConnectRegularPackage setts uuid s tos handleCmd cmd@(ConnectPersist _ gn n b) = do setts <- ask uuid <- modelConnectPersist gn n b registerAdd uuid cmd return $ createConnectPersistPackage setts uuid gn n b handleCmd (Unsubscribe r) = do setts <- ask return $ createUnsubscribePackage setts $ runningUUID r handleCmd cmd@(ApplyPersistAction _ gn n a) = do setts <- ask uuid <- modelPersistAction gn n a registerAdd uuid cmd return $ createPersistActionPackage setts uuid gn n a handleCmd (PersistAck _ run evts) = do setts <- ask let RunningPersist _ _ _ _ sid _ _ = run uuid = runningUUID run return $ createAckPackage setts uuid sid evts handleCmd (PersistNak _ run na r evts) = do setts <- ask let RunningPersist _ _ _ _ sid _ _ = run uuid = runningUUID run return $ createNakPackage setts uuid sid na r evts -------------------------------------------------------------------------------- -- | Creates a new subscription 'Driver' state machine. newDriver :: forall r. Settings -> Generator -> Driver r newDriver setts gen = Driver $ handleDriver setts (initInternal gen)