module Control.ERNet.Blocks.Basic
(
constantProcess,
constantChangedProcess,
constantStatefulProcess,
passThroughProcess,
maybePassThroughProcess,
passThroughStatefulProcess,
passThroughBinaryStatefulProcess,
passThroughBinaryProcess,
rateProcess,
precProcess
)
where
import Control.ERNet.Foundations.Protocol
import Control.ERNet.Foundations.Protocol.StandardCombinators
import qualified Control.ERNet.Foundations.Event.Logger as LG
import qualified Control.ERNet.Foundations.Channel as CH
import Control.ERNet.Foundations.Process
import Data.Number.ER.BasicTypes
import Data.Maybe
import Data.Typeable
import qualified Data.Map as Map
import Control.Concurrent as Concurrent
import Control.Concurrent.STM as STM
import Data.Number.ER.MiscSTM
constantProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(q -> a) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
constantProcess defName responderFn =
constantStatefulProcess defName responderTransferFn ()
where
responderTransferFn _ (_, q) =
(responderFn q, Nothing)
constantChangedProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(q -> a) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
constantChangedProcess defName responderFn chtp =
constantProcess defName responderNoChangesFn chtp
where
responderNoChangesFn (QAChangesQ q) =
(QAChangesANew (responderFn q))
responderNoChangesFn (QAChangesQIfNew _ _) =
QAChangesASame
responderNoChangesFn (QAChangesQWhenNew _ _) =
QAChangesAGivenUp
constantStatefulProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(s -> (QueryId, q) -> (a, Maybe s)) ->
s ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
constantStatefulProcess defName responderTransferFn initState chtp =
ERProcess defName deploy [] [chtp]
where
deploy _ [] [resCHA] _ =
do
stateTV <- newTVarIO initState
dispatcher stateTV
where
resCH =
CH.castOut
"ERNet.Blocks.Basic: constantStatefulProcess: resCH: "
resCHA
dispatcher stateTV =
do
qryData <- CH.waitForQuery resCH
forkIO $ responder stateTV qryData
dispatcher stateTV
responder stateTV (qryId, qry) =
do
ans <- atomically updateState
CH.answerQuery True resCH (qryId, ans)
where
updateState =
do
state <- readTVar stateTV
case responderTransferFn state (qryId, qry) of
(ans, Nothing) -> return ans
(ans, Just newState) ->
do
writeTVar stateTV newState
return ans
passThroughStatefulProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(s -> (QueryId, q1) -> (Either q2 a1, Maybe s)) ->
(s -> (QueryId, q1) -> a2 -> (Either q2 a1, Maybe s)) ->
s ->
ChannelType ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughStatefulProcess useCache defName qryStFn ansStFn initState chtpIn chtpRes =
ERProcess defName deploy [chtpIn] [chtpRes]
where
deploy deployName [argCHA] [resCHA] _ =
do
stateTV <- newTVarIO initState
dispatcher stateTV
where
resCH =
CH.castOut "ERNet.Blocks.Basic: passThroughStatefulProcess: resCH: "
resCHA
argCH =
CH.castIn "ERNet.Blocks.Basic: passThroughStatefulProcess: argCH: "
argCHA
dispatcher stateTV =
do
qryData <- CH.waitForQuery resCH
forkIO $ responder stateTV qryData
dispatcher stateTV
responder stateTV qryData@(resQryId, qry) =
do
qry2OrAns <- atomically $ processQuery stateTV qryData
proceed stateTV qryData qry2OrAns
proceed stateTV qryData@(resQryId, qry) (Left qry2) =
do
argQryId <- CH.makeQuery resCH resQryId argCH qry2
ans2 <- CH.waitForAnswer resCH resQryId argCH argQryId
qry2OrAns <- atomically $ processAnswer stateTV qryData ans2
proceed stateTV qryData qry2OrAns
proceed stateTV qryData@(resQryId, qry) (Right ans) =
CH.answerQuery useCache resCH (resQryId, ans)
processQuery stateTV qryData =
do
state <- readTVar stateTV
let (qry2OrAns, maybeNewState) = qryStFn state qryData
maybeWriteTVar stateTV maybeNewState
return qry2OrAns
processAnswer stateTV qryData ans2 =
do
state <- readTVar stateTV
let (qry2OrAns, maybeNewState) = ansStFn state qryData ans2
maybeWriteTVar stateTV maybeNewState
return qry2OrAns
maybeWriteTVar stateTV maybeNewState =
case maybeNewState of
Just newState -> writeTVar stateTV newState
Nothing -> return ()
passThroughProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(q1 -> q2) ->
(q1 -> a2 -> a1) ->
ChannelType ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughProcess useCache defName preFn postFn =
passThroughStatefulProcess useCache defName preStFn postStFn ()
where
preStFn _ (_, q) = (Left (preFn q), Nothing)
postStFn _ (_, q) a = (Right (postFn q a), Nothing)
maybePassThroughProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(q1 -> Bool)
->
(q1 -> a1)
->
(q1 -> q2)
->
(q1 -> a2 -> a1)
->
ChannelType ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
maybePassThroughProcess useCache defName noPass noPassResponder preFn postFn =
passThroughStatefulProcess useCache defName qryStFn ansStFn ()
where
qryStFn _ (_, q)
| noPass q =
(Right (noPassResponder q), Nothing)
| otherwise =
(Left (preFn q), Nothing)
ansStFn _ (_, q) a =
(Right (postFn q a), Nothing)
rateProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
(a -> a -> Bool) ->
Int ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
rateProcess defName goodEnough maxAttepts chtp =
passThroughStatefulProcess False defName qryStFn ansStFn initState chtp chtp
where
initState = (Nothing, 0)
qryStFn _ qryData@(_, qry) =
(Left qry, Nothing)
ansStFn (Nothing, _) qryData ans =
(Right ans, Just (Just ans, 0))
ansStFn (Just prevAns, prevAttemptNo) qryData@(_, qry) ans
| goodEnough prevAns ans || prevAttemptNo >= maxAttepts 1 =
(Right ans, Just (Just ans, 0))
| otherwise =
(Left qry, Just (Just prevAns, prevAttemptNo + 1))
precProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
ChannelType ->
a ->
ERProcess sInAnyProt sOutAnyProt
precProcess useCache name chtp sampleA =
passThroughProcess useCache name lowerIx setMinGranAux chtp chtp
where
lowerIx (QAIxQ ix q) =
(QAIxQ (ix 1) q)
setMinGranAux (QAIxQ ix q) a =
qaaSetMinGran g a
where
g = effIx2gran ix
_ = qaMatch q sampleA
passThroughBinaryStatefulProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, QAProtocol q a,
CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(s -> (QueryId, q) -> (Either (Maybe q1, Maybe q2) a, Maybe s))
->
(s -> (QueryId, q) -> (Maybe a1, Maybe a2) -> (Either (Maybe q1, Maybe q2) a, Maybe s))
->
s ->
(ChannelType, ChannelType) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughBinaryStatefulProcess useCache defName qryStFn ansStFn initState (chtpIn1, chtpIn2) chtpRes =
ERProcess defName deploy [chtpIn1, chtpIn2] [chtpRes]
where
deploy deployName [arg1CHA, arg2CHA] [resCHA] _ =
do
stateTV <- newTVarIO initState
dispatcher stateTV
where
resCH =
CH.castOut "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: resCH: "
resCHA
arg1CH =
CH.castIn "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: arg1CH: "
arg1CHA
arg2CH =
CH.castIn "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: arg2CH: "
arg2CHA
dispatcher stateTV =
do
qryData <- CH.waitForQuery resCH
forkIO $ responder stateTV qryData
dispatcher stateTV
responder stateTV qryData@(resQryId, qry) =
do
qry12OrAns <- atomically $ processQuery stateTV qryData
proceed stateTV qryData qry12OrAns
proceed stateTV qryData@(resQryId, qry) (Left (mqry1, mqry2)) =
do
mans1 <- maybeQuerySync arg1CH mqry1
mans2 <- maybeQuerySync arg2CH mqry2
qry12OrAns <- atomically $ processAnswer stateTV qryData (mans1, mans2)
proceed stateTV qryData qry12OrAns
where
maybeQuerySync argCH Nothing = return Nothing
maybeQuerySync argCH (Just qry) =
do
argQryId <- CH.makeQuery resCH resQryId argCH qry
ans <- CH.waitForAnswer resCH resQryId argCH argQryId
return $ Just ans
proceed stateTV qryData@(resQryId, qry) (Right ans) =
CH.answerQuery useCache resCH (resQryId, ans)
processQuery stateTV qryData =
do
state <- readTVar stateTV
let (qry12OrAns, maybeNewState) = qryStFn state qryData
maybeWriteTVar stateTV maybeNewState
return qry12OrAns
processAnswer stateTV qryData (mans1, mans2) =
do
state <- readTVar stateTV
let (qry12OrAns, maybeNewState) = ansStFn state qryData (mans1, mans2)
maybeWriteTVar stateTV maybeNewState
return qry12OrAns
maybeWriteTVar stateTV maybeNewState =
case maybeNewState of
Just newState -> writeTVar stateTV newState
Nothing -> return ()
passThroughBinaryProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, QAProtocol q a,
CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
(q -> (q1,q2)) ->
(q -> (a1,a2) -> a) ->
(ChannelType, ChannelType) ->
ChannelType ->
ERProcess sInAnyProt sOutAnyProt
passThroughBinaryProcess useCache defName preFn postFn =
passThroughBinaryStatefulProcess useCache defName preStFn postStFn ()
where
preStFn _ (_, q) = (Left (Just q1, Just q2), Nothing)
where
(q1, q2) = preFn q
postStFn _ (_, q) (Just a1, Just a2) = (Right (postFn q (a1, a2)), Nothing)