module Database.EventStore.Internal.Manager.Operation
( Decision(..)
, OperationParams(..)
, operationNetwork
) where
import qualified Data.Map.Strict as M
import Data.Monoid ((<>))
import Data.Word
import Data.ProtocolBuffers
import Data.Serialize
import Data.UUID
import FRP.Sodium
import System.Random
import Database.EventStore.Internal.Types hiding (Event, newEvent)
import Database.EventStore.Internal.Util.Sodium
newtype Manager = Manager (M.Map UUID Operation)
initManager :: Manager
initManager = Manager M.empty
data Decision
= DoNothing
| EndOperation
| Retry
| Reconnection
| Subscribed
data Operation
= Operation
{ operationCreatePackage :: UUID -> IO Package
, operationInspect :: Package -> IO Decision
}
data OperationParams
= forall req resp. (Encode req, Decode resp) =>
OperationParams
{ opSettings :: !Settings
, opRequestCmd :: !Word8
, opResponseCmd :: !Word8
, opRequest :: IO req
, opSuccess :: resp -> IO Decision
, opFailure :: OperationException -> IO Decision
}
createOperation :: Settings -> OperationParams -> Operation
createOperation sett params =
Operation
{ operationCreatePackage = createPackage sett params
, operationInspect = inspection params
}
createPackage :: Settings -> OperationParams -> UUID -> IO Package
createPackage Settings{..} OperationParams{..} uuid = do
req <- opRequest
let pack = Package
{ packageCmd = opRequestCmd
, packageCorrelation = uuid
, packageData = runPut $ encodeMessage req
, packageCred = s_credentials
}
return pack
inspection :: OperationParams -> Package -> IO Decision
inspection params@OperationParams{..} pack
| found == exp_v = deeperInspection params pack
| otherwise = failed (InvalidServerResponse exp_v found)
where
exp_v = opResponseCmd
failed = opFailure
found = packageCmd pack
deeperInspection :: OperationParams -> Package -> IO Decision
deeperInspection OperationParams{..} pack =
case runGet decodeMessage bytes of
Left e -> failed (ProtobufDecodingError e)
Right msg -> succeed msg
where
failed = opFailure
succeed = opSuccess
bytes = packageData pack
data Register = Register UUID Operation
newtype Remove = Remove UUID
data Response = Response !Package !Operation
operationNetwork :: Settings
-> (Package -> Reactive ())
-> Reactive ()
-> Event Package
-> Reactive (OperationParams -> Reactive ())
operationNetwork sett push_pkg push_reco e_pkg = do
(on_new, push_new) <- newEvent
(on_reg, push_reg) <- newEvent
(on_rem, push_rem) <- newEvent
(on_ret, push_ret) <- newEvent
let mgr_e = fmap register on_reg <>
fmap remove on_rem
mgr_b <- accum initManager mgr_e
let resp_e = filterJust $ snapshot response e_pkg mgr_b
on_new_op = fmap (createOperation sett) on_new <> on_ret
push_reg_io = pushAsync2 $ \uuid op -> push_reg $ Register uuid op
push_rem_io = pushAsync (push_rem . Remove)
push_retry_io = pushAsync2 $ \uuid op -> do
push_rem $ Remove uuid
push_ret op
push_reco_io = pushAsync2 $ \uuid op -> do
push_reco
push_rem $ Remove uuid
push_ret op
push_send_io = pushAsync push_pkg
_ <- listen on_new_op $ \op -> do
uuid <- randomIO
push_reg_io uuid op
_ <- listen resp_e $ \(Response pkg op) -> do
decision <- operationInspect op pkg
let corr_id = packageCorrelation pkg
case decision of
DoNothing -> return ()
EndOperation -> push_rem_io corr_id
Retry -> push_retry_io corr_id op
Reconnection -> push_reco_io corr_id op
_ -> fail unexpectedDecision
_ <- listen on_reg $ \(Register uuid op) ->
operationCreatePackage op uuid >>= push_send_io
return push_new
unexpectedDecision :: String
unexpectedDecision = "Unexpected decision Processor.handlingOperation"
register :: Register -> Manager -> Manager
register (Register uuid op) (Manager m) = Manager $ M.insert uuid op m
remove :: Remove -> Manager -> Manager
remove (Remove uuid) (Manager m) = Manager $ M.delete uuid m
response :: Package -> Manager -> Maybe Response
response pkg (Manager m) = fmap (Response pkg) $ M.lookup corr_id m
where
corr_id = packageCorrelation pkg