module Database.EventStore.Internal.Manager.Operation.Model
( Model
, Transition(..)
, newModel
, pushOperation
, submitPackage
, abort
) where
import Data.Word
import qualified Data.HashMap.Strict as H
import Data.ProtocolBuffers
import Data.Serialize
import Data.UUID
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Types
data Elem r =
forall a resp. Decode resp =>
Elem
{ _opOp :: Operation a
, _opCmd :: Word8
, _opCont :: resp -> SM a ()
, _opCb :: Either OperationError a -> r
}
data State r =
State
{ _gen :: Generator
, _pending :: H.HashMap UUID (Elem r)
}
initState :: Generator -> State r
initState g = State g H.empty
data Request r
= forall a. New (Operation a) (Either OperationError a -> r)
| Pkg Package
| Abort
data Transition r
= Produce r (Transition r)
| Transmit Package (Transition r)
| Await (Model r)
newtype Model r = Model (Request r -> Maybe (Transition r))
pushOperation :: (Either OperationError a -> r)
-> Operation a
-> Model r
-> Transition r
pushOperation cb op (Model k) = let Just t = k (New op cb) in t
submitPackage :: Package -> Model r -> Maybe (Transition r)
submitPackage pkg (Model k) = k (Pkg pkg)
abort :: Model r -> Transition r
abort (Model k) = let Just t = k Abort in t
runOperation :: Settings
-> (Either OperationError a -> r)
-> Operation a
-> SM a ()
-> State r
-> Transition r
runOperation setts cb op start init_st = go init_st start
where
go st (Return _) = Await $ Model $ handle setts st
go st (Yield a n) = Produce (cb $ Right a) (go st n)
go st (FreshId k) =
let (new_id, nxt_gen) = nextUUID $ _gen st
nxt_st = st { _gen = nxt_gen } in
go nxt_st $ k new_id
go st (SendPkg ci co rq k) =
let (new_uuid, nxt_gen) = nextUUID $ _gen st
pkg = Package
{ packageCmd = ci
, packageCorrelation = new_uuid
, packageData = runPut $ encodeMessage rq
, packageCred = s_credentials setts
}
elm = Elem op co k cb
ps = H.insert new_uuid elm $ _pending st
nxt_st = st { _pending = ps
, _gen = nxt_gen
} in
Transmit pkg (Await $ Model $ handle setts nxt_st)
go st (Failure m) =
case m of
Just e -> Produce (cb $ Left e) (Await $ Model $ handle setts st)
_ -> runOperation setts cb op op st
runPackage :: Settings -> State r -> Package -> Maybe (Transition r)
runPackage setts st Package{..} = do
Elem op resp_cmd cont cb <- H.lookup packageCorrelation $ _pending st
let nxt_ps = H.delete packageCorrelation $ _pending st
nxt_st = st { _pending = nxt_ps }
if resp_cmd /= packageCmd
then
let r = cb $ Left $ InvalidServerResponse resp_cmd packageCmd in
return $ Produce r (Await $ Model $ handle setts nxt_st)
else
case runGet decodeMessage packageData of
Left e ->
let r = cb $ Left $ ProtobufDecodingError e in
return $ Produce r (Await $ Model $ handle setts nxt_st)
Right m -> return $ runOperation setts cb op (cont m) nxt_st
abortOperations :: Settings -> State r -> Transition r
abortOperations setts init_st = go init_st $ H.toList $ _pending init_st
where
go st ((key, Elem _ _ _ k):xs) =
let ps = H.delete key $ _pending st
nxt_st = st { _pending = ps } in
Produce (k $ Left Aborted) $ go nxt_st xs
go st [] = Await $ Model $ handle setts st
newModel :: Settings -> Generator -> Model r
newModel setts g = Model $ handle setts $ initState g
handle :: Settings -> State r -> Request r -> Maybe (Transition r)
handle setts st (New op cb) = Just $ runOperation setts cb op op st
handle setts st (Pkg pkg) = runPackage setts st pkg
handle setts st Abort = Just $ abortOperations setts st