module Database.EventStore.Internal.Manager.Subscription.Driver
( SubDropReason(..)
, SubConnectEvent(..)
, PersistActionException(..)
, ConfirmedAction(..)
, NakAction(..)
, Driver
, newDriver
, submitPackage
, connectToStream
, connectToPersist
, createPersist
, updatePersist
, deletePersist
, ackPersist
, nakPersist
, unsubscribe
, abort
) where
import Control.Exception
import Data.Int
import Data.Maybe
import Data.Typeable
import Data.ByteString
import qualified Data.HashMap.Strict as H
import Data.Serialize
import Data.ProtocolBuffers
import Data.Text
import Data.UUID
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Manager.Subscription.Message
import Database.EventStore.Internal.Manager.Subscription.Model
import Database.EventStore.Internal.Manager.Subscription.Packages
import Database.EventStore.Internal.Types
data SubConnectEvent
= EventAppeared ResolvedEvent
| Dropped SubDropReason
| SubConfirmed Running
data SubDropReason
= SubUnsubscribed
| SubAccessDenied
| SubNotFound
| SubPersistDeleted
| SubAborted
deriving (Show, Eq)
data PersistActionException
= PersistActionFail
| PersistActionAlreadyExist
| PersistActionDoesNotExist
| PersistActionAccessDenied
| PersistActionAborted
deriving (Show, Typeable)
instance Exception PersistActionException
data ConfirmedAction =
ConfirmedAction
{ caId :: !UUID
, caGroup :: !Text
, caStream :: !Text
, caAction :: !PersistAction
}
submitPackage :: Package -> Driver r -> Maybe (r, Driver r)
submitPackage pkg (Driver k) = k (Pkg pkg)
connectToStream :: (SubConnectEvent -> r)
-> Text
-> Bool
-> Driver r
-> (Package, Driver r)
connectToStream c s t (Driver k) = k (Cmd $ ConnectReg c s t)
connectToPersist :: (SubConnectEvent -> r)
-> Text
-> Text
-> Int32
-> Driver r
-> (Package, Driver r)
connectToPersist c g s b (Driver k) = k (Cmd $ ConnectPersist c g s b)
createPersist :: (Either PersistActionException ConfirmedAction -> r)
-> Text
-> Text
-> PersistentSubscriptionSettings
-> Driver r
-> (Package, Driver r)
createPersist c g s ss (Driver k) =
k (Cmd $ ApplyPersistAction c g s (PersistCreate ss))
updatePersist :: (Either PersistActionException ConfirmedAction -> r)
-> Text
-> Text
-> PersistentSubscriptionSettings
-> Driver r
-> (Package, Driver r)
updatePersist c g s ss (Driver k) =
k (Cmd $ ApplyPersistAction c g s (PersistUpdate ss))
deletePersist :: (Either PersistActionException ConfirmedAction -> r)
-> Text
-> Text
-> Driver r
-> (Package, Driver r)
deletePersist c g s (Driver k) =
k (Cmd $ ApplyPersistAction c g s PersistDelete)
ackPersist :: r
-> Running
-> [UUID]
-> Driver r
-> (Package, Driver r)
ackPersist r i evts (Driver k) = k (Cmd $ PersistAck r i evts)
nakPersist :: r
-> Running
-> NakAction
-> Maybe Text
-> [UUID]
-> Driver r
-> (Package, Driver r)
nakPersist r i na mt evts (Driver k) =
k (Cmd $ PersistNak r i na mt evts)
unsubscribe :: Running -> Driver r -> (Package, Driver r)
unsubscribe r (Driver k) = k (Cmd $ Unsubscribe r)
abort :: Driver r -> [r]
abort (Driver k) = k Abort
createRException :: CreatePersistentSubscriptionResult
-> Maybe PersistActionException
createRException CPS_Success = Nothing
createRException CPS_AlreadyExists = Just PersistActionAlreadyExist
createRException CPS_Fail = Just PersistActionFail
createRException CPS_AccessDenied = Just PersistActionAccessDenied
deleteRException :: DeletePersistentSubscriptionResult
-> Maybe PersistActionException
deleteRException DPS_Success = Nothing
deleteRException DPS_DoesNotExist = Just PersistActionDoesNotExist
deleteRException DPS_Fail = Just PersistActionFail
deleteRException DPS_AccessDenied = Just PersistActionAccessDenied
updateRException :: UpdatePersistentSubscriptionResult
-> Maybe PersistActionException
updateRException UPS_Success = Nothing
updateRException UPS_DoesNotExist = Just PersistActionDoesNotExist
updateRException UPS_Fail = Just PersistActionFail
updateRException UPS_AccessDenied = Just PersistActionAccessDenied
toSubDropReason :: DropReason -> SubDropReason
toSubDropReason D_Unsubscribed = SubUnsubscribed
toSubDropReason D_NotFound = SubNotFound
toSubDropReason D_AccessDenied = SubAccessDenied
toSubDropReason D_PersistentSubscriptionDeleted = SubPersistDeleted
data In r a where
Cmd :: Cmd r -> In r (Package, Driver r)
Pkg :: Package -> In r (Maybe (r, Driver r))
Abort :: In r [r]
data Cmd r
= ConnectReg (SubConnectEvent -> r) Text Bool
| ConnectPersist (SubConnectEvent -> r)
Text
Text
Int32
| Unsubscribe Running
| ApplyPersistAction (Either PersistActionException ConfirmedAction -> r)
Text
Text
PersistAction
| PersistAck r Running [UUID]
| PersistNak r
Running
NakAction
(Maybe Text)
[UUID]
data State r =
State
{ _model :: !Model
, _gen :: !Generator
, _reg :: !(H.HashMap UUID (Cmd r))
}
initState :: Generator -> State r
initState gen = State newModel gen H.empty
newtype Driver r = Driver (forall a. In r a -> a)
newDriver :: forall r. Settings -> Generator -> Driver r
newDriver setts gen = Driver $ go (initState gen)
where
go :: forall a. State r -> In r a -> a
go st@State{..} (Pkg Package{..}) = do
elm <- H.lookup packageCorrelation _reg
case packageCmd of
0xC2 -> do
_ <- querySubscription packageCorrelation _model
msg <- maybeDecodeMessage packageData
let e = getField $ streamResolvedEvent msg
evt = newResolvedEventFromBuf e
app = EventAppeared evt
ConnectReg k _ _ = elm
return (k app, Driver $ go st)
0xC7 -> do
_ <- querySubscription packageCorrelation _model
msg <- maybeDecodeMessage packageData
let e = getField $ psseaEvt msg
evt = newResolvedEvent e
app = EventAppeared evt
ConnectPersist k _ _ _ = elm
return (k app, Driver $ go st)
0xC1 -> do
msg <- maybeDecodeMessage packageData
let lcp = getField $ subscribeLastCommitPos msg
len = getField $ subscribeLastEventNumber msg
meta = RegularMeta lcp len
ConnectReg k _ _ = elm
nxt_m = confirmedSubscription packageCorrelation meta _model
run <- querySubscription packageCorrelation nxt_m
let nxt_st = st { _model = nxt_m }
evt = SubConfirmed run
return (k evt, Driver $ go nxt_st)
0xC6 -> do
msg <- maybeDecodeMessage packageData
let lcp = getField $ pscLastCommitPos msg
sid = getField $ pscId msg
len = getField $ pscLastEvtNumber msg
meta = PersistMeta sid lcp len
ConnectPersist k _ _ _ = elm
nxt_m = confirmedSubscription packageCorrelation meta _model
run <- querySubscription packageCorrelation nxt_m
let nxt_st = st { _model = nxt_m }
evt = SubConfirmed run
return (k evt, Driver $ go nxt_st)
0xC9 -> confirmPAction elm (getField . cpscResult) createRException
0xCF -> confirmPAction elm (getField . upscResult) updateRException
0xCB -> confirmPAction elm (getField . dpscResult) deleteRException
0xC4 -> do
run <- querySubscription packageCorrelation _model
msg <- maybeDecodeMessage packageData
let reason = fromMaybe D_Unsubscribed $ getField
$ dropReason msg
nxt_m = unsubscribed run _model
dreason = toSubDropReason reason
evt = Dropped dreason
nxt_reg = H.delete packageCorrelation _reg
nxt_st = st { _model = nxt_m
, _reg = nxt_reg }
case elm of
ConnectReg k _ _ -> return (k evt, Driver $ go nxt_st)
ConnectPersist k _ _ _ -> return (k evt, Driver $ go nxt_st)
_ -> Nothing
_ -> Nothing
where
confirmPAction :: Decode m
=> Cmd r
-> (m -> e)
-> (e -> Maybe PersistActionException)
-> Maybe (r, Driver r)
confirmPAction (ApplyPersistAction k g n c) fd em = do
msg <- maybeDecodeMessage packageData
_ <- queryPersistentAction packageCorrelation _model
let nxt_m = confirmedAction packageCorrelation _model
nxt_rg = H.delete packageCorrelation _reg
nxt_st = st { _model = nxt_m
, _reg = nxt_rg
}
evt = ConfirmedAction packageCorrelation g n c
case em $ fd msg of
Just e -> return (k $ Left e, Driver $ go nxt_st)
Nothing -> return (k $ Right evt, Driver $ go nxt_st)
confirmPAction _ _ _ = Nothing
go st@State{..} (Cmd cmd) =
case cmd of
ConnectReg _ s tos ->
let (u, nxt_g) = nextUUID _gen
pkg = createConnectRegularPackage setts u s tos
nxt_m = connectReg s tos u _model
nxt_st = st { _model = nxt_m
, _gen = nxt_g
, _reg = H.insert u cmd _reg } in
(pkg, Driver $ go nxt_st)
ConnectPersist _ gn n b ->
let (u, nxt_g) = nextUUID _gen
pkg = createConnectPersistPackage setts u gn n b
nxt_m = connectPersist gn n b u _model
nxt_st = st { _model = nxt_m
, _gen = nxt_g
, _reg = H.insert u cmd _reg } in
(pkg, Driver $ go nxt_st)
Unsubscribe r ->
let pkg = createUnsubscribePackage setts $ runningUUID r in
(pkg, Driver $ go st)
ApplyPersistAction _ gn n a ->
let (u, nxt_g) = nextUUID _gen
pkg = createPersistActionPackage setts u gn n a
nxt_m = persistAction gn n u a _model
nxt_st = st { _model = nxt_m
, _gen = nxt_g
, _reg = H.insert u cmd _reg } in
(pkg, Driver $ go nxt_st)
PersistAck _ run evts ->
let RunningPersist _ _ _ _ sid _ _ = run
u = runningUUID run
pkg = createAckPackage setts u sid evts in
(pkg, Driver $ go st)
PersistNak _ run na r evts ->
let RunningPersist _ _ _ _ sid _ _ = run
u = runningUUID run
pkg = createNakPackage setts u sid na r evts in
(pkg, Driver $ go st)
go st Abort = (H.elems $ _reg st) >>= _F
where
_F (ConnectReg k _ _) = [k $ Dropped SubAborted]
_F (ConnectPersist k _ _ _) = [k $ Dropped SubAborted]
_F (ApplyPersistAction k _ _ _) = [k $ Left PersistActionAborted]
_F _ = []
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage bytes =
case runGet decodeMessage bytes of
Right a -> Just a
_ -> Nothing