module Control.ERNet.Blocks.Control.Basic
(
joinStepValProcess,
splitSyncProcess,
biasedSplitSyncProcess,
switchMultiProcess,
improverIxSimpleProcess,
improverNoIxSimpleProcess
)
where
import Control.ERNet.Foundations.Protocol
import Control.ERNet.Foundations.Protocol.StandardCombinators
import qualified Control.ERNet.Foundations.Event.Logger as LG
import qualified Control.ERNet.Foundations.Channel as CH
import Control.ERNet.Foundations.Process
import Control.Concurrent as Concurrent
import Control.Concurrent.STM as STM
import Data.Number.ER.Misc.STM
import Data.Typeable
joinStepValProcess ::
(QAProtocol q a, Show q, Show a,
CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
ChannelType ->
a ->
ERProcess sInAnyProt sOutAnyProt
joinStepValProcess defName chtp sampleAnswer =
ERProcess defName deploy [chTChanges (chTIx chTUnit), chtp] [chTChanges (chTIx chtp)]
where
deploy deployName [stepCHA, valCHA] [resCHA] _ =
do
stateTV <- newTVarIO []
forkIO $ forwarder stateTV
accumulator stateTV
where
resCH =
CH.castOut "ERNet.Blocks.Control.Basic: joinStepValProcess: resCH: "
resCHA
stepCH =
CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: stepCH: "
stepCHA
valCH =
CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: valCH: "
valCHA
accumulator stateTV =
do
qryData@(resQryId, qry) <- CH.waitForQuery resCH
case qry of
QAChangesQWhenNew _ _ ->
do
atomically $ modifyTVar stateTV $ (:) qryData
return ()
QAChangesQ (QAIxQ ix q) ->
do
valQryId <- CH.makeQuery resCH resQryId valCH q
a <- CH.waitForAnswer resCH resQryId valCH valQryId
let _ = [a, sampleAnswer]
CH.answerQuery False resCH (resQryId, QAChangesANew $ QAIxA a)
accumulator stateTV
forwarder stateTV =
do
(qryId, qry) <- atomically $
do
pendingList <- readTVar stateTV
case pendingList of
[] -> retry
(qryData : _) -> return qryData
stepQryId <- case qry of
QAChangesQWhenNew _ (QAIxQ ix q) ->
CH.makeQuery resCH qryId stepCH $ QAChangesQWhenNew 0 (QAIxQ ix QAUnitQ)
ans <- CH.waitForAnswer resCH qryId stepCH stepQryId
pendingList <- atomically $ modifyTVarGetOldVal stateTV (const [])
case ans of
QAChangesAGivenUp ->
answerAllGivenUp pendingList
_ ->
forwardAll pendingList
forwarder stateTV
answerAllGivenUp pendingList =
do
mapM_ (\ (qryId, _) -> CH.answerQuery False resCH (qryId, QAChangesAGivenUp))
pendingList
forwardAll pendingList =
do
valQryIds <-
mapM (\ (qryId, (QAChangesQWhenNew _ (QAIxQ ix q))) -> CH.makeQuery resCH qryId valCH q)
pendingList
fwdAnswers <-
mapM (\ (valQryId, (qryId, _)) -> CH.waitForAnswer resCH qryId valCH valQryId)
(zip valQryIds pendingList)
mapM_ (\ (ans, (qryId, _)) -> CH.answerQuery False resCH (qryId, QAChangesANew $ QAIxA $ ans))
(zip fwdAnswers pendingList)
biasedSplitSyncProcess ::
(QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
ChannelType ->
ChannelType ->
(q2 -> q1 -> Bool) ->
(q2 -> a1 -> a2) ->
ERProcess sInAnyProt sOutAnyProt
biasedSplitSyncProcess defName chtp1 chtp2 qry2MatchQry1 getAns2 =
ERProcess defName deploy [chtp1] [chtp1, chtp2]
where
deploy deployName [inCHA] [primCHA, secCHA] _ =
do
stateTV <- newTVarIO []
forkIO $ accumulator stateTV
forwarder stateTV
where
inCH =
CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: inCH: "
inCHA
primCH =
CH.castOut "ERNet.Blocks.Control.Basic: biasedSplitSyncProcess: primCH: "
primCHA
secCH =
CH.castOut "ERNet.Blocks.Control.Basic: biasedSplitSyncProcess: secCH: "
secCHA
accumulator stateTV =
do
qryData@(secQryId, qry) <- CH.waitForQuery secCH
atomically $ modifyTVar stateTV $ (:) qryData
accumulator stateTV
forwarder stateTV =
do
qryData@(primQryId, qry) <- CH.waitForQuery primCH
inQryId <- CH.makeQuery primCH primQryId inCH qry
ans <- CH.waitForAnswer primCH primQryId inCH inQryId
CH.answerQuery False primCH (primQryId, ans)
secQrys <- atomically $
do
pendingList <- readTVar stateTV
let (matchingQrys, prunedPendingList) = analyseList qry pendingList
writeTVar stateTV prunedPendingList
return matchingQrys
mapM (answerSecQry ans) secQrys
forwarder stateTV
where
analyseList qry1 =
analyseListAUX [] []
where
analyseListAUX prevMatching prevPruned [] = (prevMatching, prevPruned)
analyseListAUX prevMatching prevPruned (qry2Data@(qry2Id, qry2) : others)
| qry2MatchQry1 qry2 qry1 =
analyseListAUX (qry2Data : prevMatching) prevPruned others
| otherwise =
analyseListAUX prevMatching (qry2Data : prevPruned) others
answerSecQry ans1 (qry2Id, qry2) =
do
CH.answerQuery False secCH (qry2Id, getAns2 qry2 ans1)
splitSyncProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
ChannelType ->
Int ->
a ->
ERProcess sInAnyProt sOutAnyProt
splitSyncProcess defName chtp n sampleAnswer =
ERProcess defName deploy [chtp] (replicate n chtp)
where
deploy deployName [inCHA] outCHAs _ =
do
stateTV <- newTVarIO []
forkIO $ accumulator stateTV
forwarder stateTV
where
inCH =
CH.castIn "ERNet.Blocks.Control.Basic: splitSyncProcess: inCH: "
inCHA
outCHs =
map (\(chA, chN) ->
CH.castOut ("ERNet.Blocks.Control.Basic: splitSyncProcess: outCH" ++ show chN ++ ": ") chA) $
zip outCHAs [0..]
accumulator stateTV =
do
(chN, (qryId, QueryAnyProt qry_)) <- CH.waitForQueryMulti outCHAs
let (Just qry) = cast qry_
atomically $ modifyTVar stateTV $ (:) $ (chN, (qryId, qry))
accumulator stateTV
forwarder stateTV =
do
(chN, (qryId, qry)) <- atomically $
do
pendingList <- readTVar stateTV
case pendingList of
[] -> retry
(qryData : _) -> return qryData
let outCH = outCHs !! chN
inQryId <- CH.makeQuery outCH qryId inCH qry
ans <- CH.waitForAnswer outCH qryId inCH inQryId
let _ = [ans, sampleAnswer]
qrys <- atomically $
do
pendingList <- readTVar stateTV
let (matchingQrys, prunedPendingList) = analyseList qry pendingList
writeTVar stateTV prunedPendingList
return matchingQrys
mapM (answerQry ans) qrys
forwarder stateTV
where
analyseList qry1 =
analyseListAUX [] []
where
analyseListAUX prevMatching prevPruned [] = (prevMatching, prevPruned)
analyseListAUX prevMatching prevPruned (qry2Data@(_, (_, qry2)) : others)
| qry2 == qry1 =
analyseListAUX (qry2Data : prevMatching) prevPruned others
| otherwise =
analyseListAUX prevMatching (qry2Data : prevPruned) others
answerQry ans1 (chN, (qry2Id, qry2)) =
do
CH.answerQuery False (outCHs !! chN) (qry2Id, ans1)
switchMultiProcess ::
(CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
Bool ->
ERProcessName ->
[ChannelType] ->
ERProcess sInAnyProt sOutAnyProt
switchMultiProcess useCache defName chTypes =
ERProcess defName deploy (chTBool : (chTypes ++ chTypes)) (chTypes)
where
deploy deployName (switchCHA : inCHAs) resCHAs _ =
dispatcher
where
(in1CHAs, in2CHAs) =
splitAt (length resCHAs) inCHAs
switchCH =
CH.castIn "ERNet.Blocks.Control.Basic: switchMultiProcess: switchCH: " switchCHA
dispatcher =
do
(chN, qryData) <- CH.waitForQueryMulti resCHAs
forkIO $ responder chN qryData
dispatcher
responder chN (resQryId, qry) =
do
case chtp of
ChannelType _ sampleAnswer ->
do
resCH <-
CH.castOutIO
("ERNet.Blocks.Control.Basic: switchMultiProcess: resCHAs !! " ++ show chN ++ ": ")
resCHA
let _ = CH.answerQuery False resCH (0, sampleAnswer)
switchQryId <- CH.makeQuery resCH resQryId switchCH QABoolQ
shouldSwitch <- CH.waitForAnswer resCH resQryId switchCH switchQryId
case () of
_ ->
do
qryId <-
CH.makeQueryAnyProt "ERNet.Blocks.Control.Basic: switchMultiProcess: making query on inCHA: "
resCHA resQryId inCHA qry
(_, ans) <-
CH.waitForAnswerMulti resCHA resQryId [(inCHA, qryId)]
CH.answerQueryAnyProt "ERNet.Blocks.Control.Basic: switchMultiProcess: answering query on resCHA: "
useCache resCHA (resQryId, ans)
where
inCHA = inCHAs !! chN
inCHAs = if shouldSwitch then in2CHAs else in1CHAs
where
chtp = chTypes !! chN
resCHA = resCHAs !! chN
improverIxSimpleProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
ChannelType ->
a ->
ERProcess sInAnyProt sOutAnyProt
improverIxSimpleProcess defName chType sampleAns =
ERProcess defName deploy [chType, chType] [chType]
where
deploy _ [initCHA, inCHA] [resCHA] _ =
do
dispatcher
where
initCH =
CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: initCH: "
initCHA
inCH =
CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: inCH: "
inCHA
resCH =
CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCH: "
resCHA
_ = [inCH, initCH]
_ = CH.answerQuery False resCH (0, QAIxA sampleAns)
dispatcher =
do
qryData <- CH.waitForQuery resCH
forkIO $ responder qryData
dispatcher
responder (resQryId, QAIxQ ix sqry) =
do
ans <-
case ix == 0 of
True -> enquire initCH resQryId (QAIxQ ix sqry)
False -> enquire inCH resQryId (QAIxQ (ix 1) sqry)
CH.answerQuery True resCH (resQryId, ans)
enquire ch resQryId qry =
do
fwdQryId <- CH.makeQuery resCH resQryId ch qry
CH.waitForAnswer resCH resQryId ch fwdQryId
improverNoIxSimpleProcess ::
(QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
ERProcessName ->
ChannelType ->
a ->
ERProcess sInAnyProt sOutAnyProt
improverNoIxSimpleProcess defName chType sampleAns =
ERProcess defName deploy [chType, chType] [chType, chType]
where
deploy _ [initCHA, inCHA] [resCHA, resCurrCHA] _ =
do
stateTV <- newTVarIO Nothing
forkIO $ currentValServer stateTV
responder stateTV
where
initCH =
CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: initCH: "
initCHA
inCH =
CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: inCH: "
inCHA
resCH =
CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCH: "
resCHA
resCurrCH =
CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCurrCH: "
resCurrCHA
_ = [resCH, resCurrCH]
_ = [inCH, initCH]
_ = CH.answerQuery False resCH (0, sampleAns)
currentValServer stateTV =
do
(qryId, qry) <- CH.waitForQuery resCurrCH
mans <- atomically $ readTVar stateTV
ans <- case mans of
Nothing ->
do
initQryId <- CH.makeQuery resCurrCH qryId initCH qry
ans <- CH.waitForAnswer resCurrCH qryId initCH initQryId
atomically $ writeTVar stateTV $ Just ans
return ans
Just ans ->
return ans
CH.answerQuery False resCurrCH (qryId, ans)
currentValServer stateTV
responder stateTV =
do
(resQryId, qry) <- CH.waitForQuery resCH
fwdQryId <- CH.makeQuery resCH resQryId inCH qry
ans <- CH.waitForAnswer resCH resQryId inCH fwdQryId
CH.answerQuery False resCH (resQryId, ans)
atomically $ writeTVar stateTV $ Just ans
responder stateTV