module Control.ERNet.Blocks.Basic
(
constantProcess,
constantChangedProcess,
constantStatefulProcess,
passThroughProcess,
maybePassThroughProcess,
passThroughStatefulProcess,
passThroughBinaryStatefulProcess,
passThroughBinaryProcess,
rateProcess,
precProcess
)
where
import Control.ERNet.Foundations.Protocol
import Control.ERNet.Foundations.Protocol.StandardCombinators
import qualified Control.ERNet.Foundations.Event.Logger as LG
import qualified Control.ERNet.Foundations.Channel as CH
import Control.ERNet.Foundations.Process
import Data.Number.ER.BasicTypes
import Data.Maybe
import Data.Typeable
import qualified Data.Map as Map
import Control.Concurrent as Concurrent
import Control.Concurrent.STM as STM
import Data.Number.ER.MiscSTM
constantProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(q -> a) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
constantProcess defName responderFn =
constantStatefulProcess defName responderTransferFn ()
where
responderTransferFn _ (_, q) =
((True, responderFn q), Nothing)
constantChangedProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(q -> a) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
constantChangedProcess defName responderFn chtp =
constantProcess defName responderNoChangesFn chtp
where
responderNoChangesFn (QAChangesQ q) =
(QAChangesANew (responderFn q))
responderNoChangesFn (QAChangesQIfNew _ _) =
QAChangesASame
responderNoChangesFn (QAChangesQWhenNew _ _) =
QAChangesAGivenUp
constantStatefulProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(s -> (QueryId, q) -> ((Bool, a), Maybe s)) ->
s ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
constantStatefulProcess defName responderTransferFn initState chtp =
ERProcess defName deploy [] [chtp]
where
deploy _ [] [resCHA] _ =
do
stateTV <- newTVarIO initState
dispatcher stateTV
where
resCH =
CH.castOut
"ERNet.Blocks.Basic: constantStatefulProcess: resCH: "
resCHA
dispatcher stateTV =
do
qryData <- CH.waitForQuery resCH
forkIO $ responder stateTV qryData
dispatcher stateTV
responder stateTV (qryId, qry) =
do
(useCache, ans) <- atomically updateState
CH.answerQuery useCache resCH (qryId, ans)
where
updateState =
do
state <- readTVar stateTV
case responderTransferFn state (qryId, qry) of
(ansData, Nothing) -> return ansData
(ansData, Just newState) ->
do
writeTVar stateTV newState
return ansData
passThroughStatefulProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(s -> (QueryId, q1) -> (ERProcessAction s q2 a1, Maybe s)) ->
(s -> (QueryId, q1) -> (q2,a2) -> (ERProcessAction s q2 a1, Maybe s)) ->
s ->
ChannelType ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughStatefulProcess defName qryStFn ansStFn initState chtpIn chtpRes =
ERProcess defName deploy [chtpIn] [chtpRes]
where
deploy deployName [argCHA] [resCHA] _ =
do
stateTV <- newTVarIO initState
dispatcher stateTV
where
resCH =
CH.castOut "ERNet.Blocks.Basic: passThroughStatefulProcess: resCH: "
resCHA
argCH =
CH.castIn "ERNet.Blocks.Basic: passThroughStatefulProcess: argCH: "
argCHA
dispatcher stateTV =
do
qryData <- CH.waitForQuery resCH
forkIO $ responder stateTV qryData
dispatcher stateTV
responder stateTV qryData@(resQryId, qry) =
do
action <- atomically $ processQuery stateTV qryData
proceed stateTV qryData action
proceed stateTV qryData (ERProcessActionRetryWhen triggerCond) =
do
atomically $
do
state <- readTVar stateTV
case triggerCond state of
True -> return ()
False -> retry
responder stateTV qryData
proceed stateTV qryData@(resQryId, qry) (ERProcessActionQuery qry2) =
do
argQryId <- CH.makeQuery resCH resQryId argCH qry2
ans2 <- CH.waitForAnswer resCH resQryId argCH argQryId
qry2OrAns <- atomically $ processAnswer stateTV qryData (qry2,ans2)
proceed stateTV qryData qry2OrAns
proceed stateTV qryData@(resQryId, qry) (ERProcessActionAnswer useCache ans) =
CH.answerQuery useCache resCH (resQryId, ans)
processQuery stateTV qryData =
do
state <- readTVar stateTV
let (qry2OrAns, maybeNewState) = qryStFn state qryData
maybeWriteTVar stateTV maybeNewState
return qry2OrAns
processAnswer stateTV qryData ans2 =
do
state <- readTVar stateTV
let (qry2OrAns, maybeNewState) = ansStFn state qryData ans2
maybeWriteTVar stateTV maybeNewState
return qry2OrAns
maybeWriteTVar stateTV maybeNewState =
case maybeNewState of
Just newState -> writeTVar stateTV newState
Nothing -> return ()
passThroughProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(q1 -> q2) ->
(q1 -> a2 -> a1) ->
ChannelType ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughProcess useCache defName preFn postFn =
passThroughStatefulProcess defName preStFn postStFn ()
where
preStFn _ (_, q) = (ERProcessActionQuery (preFn q), Nothing)
postStFn _ (_, q) (_,a) = (ERProcessActionAnswer useCache (postFn q a), Nothing)
maybePassThroughProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(q1 -> Bool)
->
(q1 -> a1)
->
(q1 -> q2)
->
(q1 -> a2 -> a1)
->
ChannelType ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
maybePassThroughProcess useCache defName noPass noPassResponder preFn postFn =
passThroughStatefulProcess defName qryStFn ansStFn ()
where
qryStFn _ (_, q)
| noPass q =
(ERProcessActionAnswer useCache (noPassResponder q), Nothing)
| otherwise =
(ERProcessActionQuery (preFn q), Nothing)
ansStFn _ (_, q) (_, a) =
(ERProcessActionAnswer useCache (postFn q a), Nothing)
rateProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(a -> a -> Bool) ->
Int ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
rateProcess defName goodEnough maxAttepts chtp =
passThroughStatefulProcess defName qryStFn ansStFn initState chtp chtp
where
initState = (Nothing, 0)
qryStFn _ qryData@(_, qry) =
(ERProcessActionQuery qry, Nothing)
ansStFn (Nothing, _) qryData (_, ans) =
(ERProcessActionAnswer False ans, Just (Just ans, 0))
ansStFn (Just prevAns, prevAttemptNo) qryData@(_, qry) (_, ans)
| goodEnough prevAns ans || prevAttemptNo >= maxAttepts 1 =
(ERProcessActionAnswer False ans, Just (Just ans, 0))
| otherwise =
(ERProcessActionQuery qry, Just (Just prevAns, prevAttemptNo + 1))
precProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
ChannelType ->
a ->
ERProcess sInAnyProt sOutAnyProt
precProcess useCache name chtp sampleA =
passThroughProcess useCache name lowerIx setMinGranAux chtp chtp
where
lowerIx (QAIxQ ix q) =
(QAIxQ (ix 1) q)
setMinGranAux (QAIxQ ix q) a =
qaaSetMinGran g a
where
g = effIx2gran ix
_ = qaMatch q sampleA
passThroughBinaryStatefulProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, QAProtocol q a,
CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(s -> (QueryId, q) -> (ERProcessAction s (Maybe q1, Maybe q2) a, Maybe s))
->
(s -> (QueryId, q) -> (Maybe a1, Maybe a2) -> (ERProcessAction s (Maybe q1, Maybe q2) a, Maybe s))
->
s ->
(ChannelType, ChannelType) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughBinaryStatefulProcess defName qryStFn ansStFn initState (chtpIn1, chtpIn2) chtpRes =
ERProcess defName deploy [chtpIn1, chtpIn2] [chtpRes]
where
deploy deployName [arg1CHA, arg2CHA] [resCHA] _ =
do
stateTV <- newTVarIO initState
dispatcher stateTV
where
resCH =
CH.castOut "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: resCH: "
resCHA
arg1CH =
CH.castIn "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: arg1CH: "
arg1CHA
arg2CH =
CH.castIn "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: arg2CH: "
arg2CHA
dispatcher stateTV =
do
qryData <- CH.waitForQuery resCH
forkIO $ responder stateTV qryData
dispatcher stateTV
responder stateTV qryData@(resQryId, qry) =
do
qry12OrAns <- atomically $ processQuery stateTV qryData
proceed stateTV qryData qry12OrAns
proceed stateTV qryData (ERProcessActionRetryWhen triggerCond) =
do
atomically $
do
state <- readTVar stateTV
case triggerCond state of
True -> return ()
False -> retry
responder stateTV qryData
proceed stateTV qryData@(resQryId, qry) (ERProcessActionQuery (mqry1, mqry2)) =
do
mans1 <- maybeQuerySync arg1CH mqry1
mans2 <- maybeQuerySync arg2CH mqry2
qry12OrAns <- atomically $ processAnswer stateTV qryData (mans1, mans2)
proceed stateTV qryData qry12OrAns
where
maybeQuerySync argCH Nothing = return Nothing
maybeQuerySync argCH (Just qry) =
do
argQryId <- CH.makeQuery resCH resQryId argCH qry
ans <- CH.waitForAnswer resCH resQryId argCH argQryId
return $ Just ans
proceed stateTV qryData@(resQryId, qry) (ERProcessActionAnswer useCache ans) =
CH.answerQuery useCache resCH (resQryId, ans)
processQuery stateTV qryData =
do
state <- readTVar stateTV
let (qry12OrAns, maybeNewState) = qryStFn state qryData
maybeWriteTVar stateTV maybeNewState
return qry12OrAns
processAnswer stateTV qryData (mans1, mans2) =
do
state <- readTVar stateTV
let (qry12OrAns, maybeNewState) = ansStFn state qryData (mans1, mans2)
maybeWriteTVar stateTV maybeNewState
return qry12OrAns
maybeWriteTVar stateTV maybeNewState =
case maybeNewState of
Just newState -> writeTVar stateTV newState
Nothing -> return ()
passThroughBinaryProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, QAProtocol q a,
CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(q -> (q1,q2)) ->
(q -> (a1,a2) -> a) ->
(ChannelType, ChannelType) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughBinaryProcess useCache defName preFn postFn =
passThroughBinaryStatefulProcess defName preStFn postStFn ()
where
preStFn _ (_, q) = (ERProcessActionQuery (Just q1, Just q2), Nothing)
where
(q1, q2) = preFn q
postStFn _ (_, q) (Just a1, Just a2) = (ERProcessActionAnswer useCache (postFn q (a1, a2)), Nothing)