module Database.EventStore.Internal.Manager.Subscription.Driver
( SubDropReason(..)
, SubConnectEvent(..)
, PersistActionException(..)
, ConfirmedAction(..)
, NakAction(..)
, Driver
, newDriver
, submitPackage
, connectToStream
, connectToPersist
, createPersist
, updatePersist
, deletePersist
, ackPersist
, nakPersist
, unsubscribe
, abort
) where
import Data.Int
import Data.Maybe
import ClassyPrelude hiding (group)
import Control.Monad.State
import Data.UUID
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Manager.Subscription.Command
import Database.EventStore.Internal.Manager.Subscription.Message
import Database.EventStore.Internal.Manager.Subscription.Model
import Database.EventStore.Internal.Manager.Subscription.Packages
import Database.EventStore.Internal.Manager.Subscription.Types
import Database.EventStore.Internal.Types
data SubConnectEvent
= EventAppeared ResolvedEvent
| Dropped SubDropReason
| SubConfirmed Running
| Unsubscribed
data PersistActionException
= PersistActionFail
| PersistActionAlreadyExist
| PersistActionDoesNotExist
| PersistActionAccessDenied
| PersistActionAborted
deriving (Show, Typeable)
instance Exception PersistActionException
data ConfirmedAction =
ConfirmedAction
{ caId :: !UUID
, caGroup :: !Text
, caStream :: !Text
, caAction :: !PersistAction
}
submitPackage :: Package -> Driver r -> Maybe (r, Driver r)
submitPackage pkg (Driver k) = k (Pkg pkg)
connectToStream :: (SubConnectEvent -> r)
-> Text
-> Bool
-> Driver r
-> (Package, Driver r)
connectToStream c s t (Driver k) = k (Cmd $ ConnectReg c s t)
connectToPersist :: (SubConnectEvent -> r)
-> Text
-> Text
-> Int32
-> Driver r
-> (Package, Driver r)
connectToPersist c g s b (Driver k) = k (Cmd $ ConnectPersist c g s b)
createPersist :: (Either PersistActionException ConfirmedAction -> r)
-> Text
-> Text
-> PersistentSubscriptionSettings
-> Driver r
-> (Package, Driver r)
createPersist c g s ss (Driver k) =
k (Cmd $ ApplyPersistAction c g s (PersistCreate ss))
updatePersist :: (Either PersistActionException ConfirmedAction -> r)
-> Text
-> Text
-> PersistentSubscriptionSettings
-> Driver r
-> (Package, Driver r)
updatePersist c g s ss (Driver k) =
k (Cmd $ ApplyPersistAction c g s (PersistUpdate ss))
deletePersist :: (Either PersistActionException ConfirmedAction -> r)
-> Text
-> Text
-> Driver r
-> (Package, Driver r)
deletePersist c g s (Driver k) =
k (Cmd $ ApplyPersistAction c g s PersistDelete)
ackPersist :: r
-> Running
-> [UUID]
-> Driver r
-> (Package, Driver r)
ackPersist r i evts (Driver k) = k (Cmd $ PersistAck r i evts)
nakPersist :: r
-> Running
-> NakAction
-> Maybe Text
-> [UUID]
-> Driver r
-> (Package, Driver r)
nakPersist r i na mt evts (Driver k) =
k (Cmd $ PersistNak r i na mt evts)
unsubscribe :: Running -> Driver r -> (Package, Driver r)
unsubscribe r (Driver k) = k (Cmd $ Unsubscribe r)
abort :: Driver r -> [r]
abort (Driver k) = k Abort
createRException :: CreatePersistentSubscriptionResult
-> Maybe PersistActionException
createRException CPS_Success = Nothing
createRException CPS_AlreadyExists = Just PersistActionAlreadyExist
createRException CPS_Fail = Just PersistActionFail
createRException CPS_AccessDenied = Just PersistActionAccessDenied
deleteRException :: DeletePersistentSubscriptionResult
-> Maybe PersistActionException
deleteRException DPS_Success = Nothing
deleteRException DPS_DoesNotExist = Just PersistActionDoesNotExist
deleteRException DPS_Fail = Just PersistActionFail
deleteRException DPS_AccessDenied = Just PersistActionAccessDenied
updateRException :: UpdatePersistentSubscriptionResult
-> Maybe PersistActionException
updateRException UPS_Success = Nothing
updateRException UPS_DoesNotExist = Just PersistActionDoesNotExist
updateRException UPS_Fail = Just PersistActionFail
updateRException UPS_AccessDenied = Just PersistActionAccessDenied
data In r a where
Cmd :: Cmd r -> In r (Package, Driver r)
Pkg :: Package -> In r (Maybe (r, Driver r))
Abort :: In r [r]
data Cmd r
= ConnectReg (SubConnectEvent -> r) Text Bool
| ConnectPersist (SubConnectEvent -> r)
Text
Text
Int32
| Unsubscribe Running
| ApplyPersistAction (Either PersistActionException ConfirmedAction -> r)
Text
Text
PersistAction
| PersistAck r Running [UUID]
| PersistNak r
Running
NakAction
(Maybe Text)
[UUID]
cmdSubCallback :: Cmd r -> Maybe (SubConnectEvent -> r)
cmdSubCallback (ConnectReg k _ _) = Just k
cmdSubCallback (ConnectPersist k _ _ _) = Just k
cmdSubCallback _ = Nothing
data Internal r =
Internal
{ _model :: !Model
, _gen :: !Generator
, _reg :: !(HashMap UUID (Cmd r))
}
initInternal :: Generator -> Internal r
initInternal gen = Internal newModel gen mempty
newtype Driver r = Driver (forall a. In r a -> a)
newtype DriverM r m a = DriverM (ReaderT Settings (StateT (Internal r) m) a)
deriving ( Functor
, Applicative
, Monad
, MonadReader Settings
, MonadState (Internal r)
)
instance MonadTrans (DriverM r) where
lift m = DriverM $ lift $ lift m
noop :: DriverM r Maybe a
noop = lift Nothing
modelSubRunning :: UUID -> DriverM r Maybe Running
modelSubRunning uuid = do
model <- gets _model
lift $ querySubscription uuid $ model
modelSubConfirmed :: Monad m => UUID -> Meta -> DriverM r m ()
modelSubConfirmed uuid meta = do
model <- gets _model
let nxt = confirmedSubscription uuid meta model
modify $ \s -> s { _model = nxt }
modelActionConfirmed :: Monad m => UUID -> DriverM r m ()
modelActionConfirmed uuid =
modify $ \s -> s { _model = confirmedAction uuid $ _model s }
modelUnsubscribed :: UUID -> DriverM r Maybe ()
modelUnsubscribed uuid = do
run <- modelSubRunning uuid
model <- gets _model
modify $ \s -> s { _model = unsubscribed run model }
registerDelete :: Monad m => UUID -> DriverM r m ()
registerDelete uuid = do
reg <- gets _reg
let nxtR = deleteMap uuid reg
modify $ \s -> s { _reg = nxtR }
registerAdd :: Monad m => UUID -> Cmd r -> DriverM r m ()
registerAdd uuid cmd = do
reg <- gets _reg
modify $ \s -> s { _reg = insertMap uuid cmd reg }
modelPersistentAction :: UUID -> DriverM r Maybe PendingAction
modelPersistentAction uuid = do
model <- gets _model
lift $ queryPersistentAction uuid model
freshUUID :: Monad m => DriverM r m UUID
freshUUID = do
(uuid, nxtG) <- gets (nextUUID . _gen)
modify $ \s -> s { _gen = nxtG }
return uuid
modelConnectReg :: Monad m => Text -> Bool -> DriverM r m UUID
modelConnectReg stream tos = do
uuid <- freshUUID
model <- gets _model
modify $ \s -> s { _model = connectReg stream tos uuid model }
return uuid
modelConnectPersist :: Monad m => Text -> Text -> Int32 -> DriverM r m UUID
modelConnectPersist group name batch = do
uuid <- freshUUID
model <- gets _model
modify $ \s -> s { _model = connectPersist group name batch uuid model }
return uuid
modelPersistAction :: Monad m
=> Text
-> Text
-> PersistAction
-> DriverM r m UUID
modelPersistAction group name action = do
uuid <- freshUUID
model <- gets _model
modify $ \s -> s { _model = persistAction group name uuid action model }
return uuid
runDriverM :: Monad m
=> Settings
-> Internal r
-> DriverM r m a
-> m (a, Driver r)
runDriverM setts st (DriverM m) = do
(a, nxtSt) <- runStateT (runReaderT m setts) st
return (a, Driver $ handleDriver setts nxtSt)
runDriver :: Settings
-> Internal r
-> DriverM r Identity a
-> (a, Driver r)
runDriver setts st action = runIdentity $ runDriverM setts st action
handleDriver :: Settings -> Internal r -> In r a -> a
handleDriver setts st (Pkg pkg) = do
let corrId = packageCorrelation pkg
cmd <- lookup corrId $ _reg st
let action = handleMsg corrId cmd $ decodeServerMessage pkg
runDriverM setts st action
handleDriver setts st (Cmd cmd) =
let action = handleCmd cmd in
runDriver setts st action
handleDriver _ st Abort = (fmap snd $ mapToList $ _reg st) >>= _F
where
_F (ConnectReg k _ _) = [k $ Dropped SubAborted]
_F (ConnectPersist k _ _ _) = [k $ Dropped SubAborted]
_F (ApplyPersistAction k _ _ _) = [k $ Left PersistActionAborted]
_F _ = []
handleMsg :: UUID -> Cmd r -> ServerMessage -> DriverM r Maybe r
handleMsg corrId = go
where
go (ConnectReg k _ _) (EventAppearedMsg evt) = do
_ <- modelSubRunning corrId
return $ k $ EventAppeared evt
go (ConnectPersist k _ _ _) (PersistentEventAppearedMsg evt) = do
_ <- modelSubRunning corrId
return $ k $ EventAppeared evt
go (ConnectReg k _ _) (ConfirmationMsg lcp len) = do
let meta = RegularMeta lcp len
modelSubConfirmed corrId meta
run <- modelSubRunning corrId
return $ k $ SubConfirmed run
go (ConnectPersist k _ _ _) (PersistentConfirmationMsg sid lcp len) = do
let meta = PersistMeta sid lcp len
modelSubConfirmed corrId meta
run <- modelSubRunning corrId
return $ k $ SubConfirmed run
go cmd (PersistentCreatedMsg res) =
confirmPAction cmd $ createRException res
go cmd (PersistentUpdatedMsg res) =
confirmPAction cmd $ updateRException res
go cmd (PersistentDeletedMsg res) =
confirmPAction cmd $ deleteRException res
go cmd (DroppedMsg reason) = do
modelUnsubscribed corrId
registerDelete corrId
let evt =
case reason of
SubUnsubscribed -> Unsubscribed
_ -> Dropped reason
k <- lift $ cmdSubCallback cmd
return $ k evt
go cmd (BadRequestMsg msg) =
go cmd (DroppedMsg $ SubServerError msg)
go cmd (NotAuthenticatedMsg msg) =
go cmd (DroppedMsg $ SubNotAuthenticated msg)
go cmd (NotHandledMsg reason info) =
go cmd (DroppedMsg $ SubNotHandled reason info)
go cmd (UnknownMsg pkgCmdM) = do
k <- lift $ cmdSubCallback cmd
let msgM = fmap (\c -> "unknown command: " <> tshow c) pkgCmdM
return $ k $ Dropped $ SubServerError msgM
go cmd _ = do
k <- lift $ cmdSubCallback cmd
let msg = "Logic error in Subscription Driver (the impossible happened)"
return $ k $ Dropped $ SubClientError msg
confirmPAction :: Cmd r
-> Maybe PersistActionException
-> DriverM r Maybe r
confirmPAction (ApplyPersistAction k g n c) em = do
_ <- modelPersistentAction corrId
modelActionConfirmed corrId
registerDelete corrId
let evt = ConfirmedAction corrId g n c
case em of
Just e -> return $ k $ Left e
Nothing -> return $ k $ Right evt
confirmPAction _ _ = noop
handleCmd :: Monad m => Cmd r -> DriverM r m Package
handleCmd cmd@(ConnectReg _ s tos) = do
setts <- ask
uuid <- modelConnectReg s tos
registerAdd uuid cmd
return $ createConnectRegularPackage setts uuid s tos
handleCmd cmd@(ConnectPersist _ gn n b) = do
setts <- ask
uuid <- modelConnectPersist gn n b
registerAdd uuid cmd
return $ createConnectPersistPackage setts uuid gn n b
handleCmd (Unsubscribe r) = do
setts <- ask
return $ createUnsubscribePackage setts $ runningUUID r
handleCmd cmd@(ApplyPersistAction _ gn n a) = do
setts <- ask
uuid <- modelPersistAction gn n a
registerAdd uuid cmd
return $ createPersistActionPackage setts uuid gn n a
handleCmd (PersistAck _ run evts) = do
setts <- ask
let RunningPersist _ _ _ _ sid _ _ = run
uuid = runningUUID run
return $ createAckPackage setts uuid sid evts
handleCmd (PersistNak _ run na r evts) = do
setts <- ask
let RunningPersist _ _ _ _ sid _ _ = run
uuid = runningUUID run
return $ createNakPackage setts uuid sid na r evts
newDriver :: forall r. Settings -> Generator -> Driver r
newDriver setts gen = Driver $ handleDriver setts (initInternal gen)