{-| Module : Control.ERNet.Blocks.Control.Basic Description : processes with purely synchronisation effects Copyright : (c) Michal Konecny License : BSD3 Maintainer : mik@konecny.aow.cz Stability : experimental Portability : portable A collection of processes whose main purpose is to synchronise other processes and have little semantical value. -} module Control.ERNet.Blocks.Control.Basic ( joinStepValProcess, splitSyncProcess, biasedSplitSyncProcess, switchMultiProcess, improverIxSimpleProcess, improverNoIxSimpleProcess ) where import Control.ERNet.Foundations.Protocol import Control.ERNet.Foundations.Protocol.StandardCombinators import qualified Control.ERNet.Foundations.Event.Logger as LG import qualified Control.ERNet.Foundations.Channel as CH import Control.ERNet.Foundations.Process import Control.Concurrent as Concurrent import Control.Concurrent.STM as STM import Data.Number.ER.MiscSTM import Data.Typeable {-| This process joins information from two channels ("step", "val") in such a way that it acts as a splitter of responsibilities for its multi-threaded failure-enabled result channel as follows: * The "step" channel provides the timing and effort information for responses. * The "val" channel provides values without significant blocking. While the process is waiting for a response from the step channel, any queries are put on hold until the response comes. * If the step channel responds with indication of failure, then all pending queries are answered as failed. * If the step channel responds with ok, then all the pending queries are forwarded to the value channel and answered asap. No new queries are accepted during such forwarding stage. -} joinStepValProcess :: (QAProtocol q a, Show q, Show a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> ChannelType {-^ value input channel type -} -> a {-^ example answer on value channel to help compiler work out the protocol -} -> ERProcess sInAnyProt sOutAnyProt joinStepValProcess defName chtp sampleAnswer = ERProcess defName deploy [chTChanges (chTIx chTUnit), chtp] [chTChanges (chTIx chtp)] where deploy deployName [stepCHA, valCHA] [resCHA] _ = do stateTV <- newTVarIO [] -- list of pending queries forkIO $ forwarder stateTV accumulator stateTV where resCH = CH.castOut "ERNet.Blocks.Control.Basic: joinStepValProcess: resCH: " resCHA stepCH = CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: stepCH: " stepCHA valCH = CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: valCH: " valCHA accumulator stateTV = do qryData@(resQryId, qry) <- CH.waitForQuery resCH -- explore the kind of query: case qry of QAChangesQWhenNew _ _ -> -- add this query to the pending list: atomically $ modifyTVar stateTV $ (:) qryData QAChangesQ (QAIxQ ix q) -> do -- forward and answer this immediately: valQryId <- CH.makeQuery resCH resQryId valCH q a <- CH.waitForAnswer resCH resQryId valCH valQryId let _ = [a, sampleAnswer] -- indicate the protocol on valCH CH.answerQuery False resCH (resQryId, QAChangesANew $ QAIxA a) accumulator stateTV forwarder stateTV = do -- wait until pending list non-empty: (qryId, qry) <- atomically $ do pendingList <- readTVar stateTV case pendingList of [] -> retry (qryData : _) -> return qryData -- explore the query: stepQryId <- case qry of QAChangesQWhenNew _ (QAIxQ ix q) -> -- make a query to the step channel: CH.makeQuery resCH qryId stepCH $ QAChangesQWhenNew 0 (QAIxQ ix QAUnitQ) -- wait for step answer signalling that value is ready: ans <- CH.waitForAnswer resCH qryId stepCH stepQryId -- obtain the current list of pending queries and empty the shared list: pendingList <- atomically $ modifyTVarGetOldVal stateTV (const []) -- investigate whether to indicate failure: case ans of QAChangesAGivenUp -> answerAllGivenUp pendingList _ -> forwardAll pendingList -- go back to the beginning: forwarder stateTV answerAllGivenUp pendingList = do mapM_ (\ (qryId, _) -> CH.answerQuery False resCH (qryId, QAChangesAGivenUp)) pendingList forwardAll pendingList = do -- forward all pending queries: valQryIds <- mapM (\ (qryId, (QAChangesQWhenNew _ (QAIxQ ix q))) -> CH.makeQuery resCH qryId valCH q) pendingList -- wait for all answers in turn: fwdAnswers <- mapM (\ (valQryId, (qryId, _)) -> CH.waitForAnswer resCH qryId valCH valQryId) (zip valQryIds pendingList) -- forward all these answers: mapM_ (\ (ans, (qryId, _)) -> CH.answerQuery False resCH (qryId, QAChangesANew $ QAIxA $ ans)) (zip fwdAnswers pendingList) {-| This process provides two channels (primary, secondary) split off from one source channel. The primary channel is a clean forward of the source channel. The secondary channel can use a slightly different protocol than the primary channel. Any query on the secondary channel will be blocked until a matching query is received and processed on the primary channel. (The user must supply a function that decides whether or not the queries are matching.) Whenever a query is being answered on the primary channel, all queries pending on the secondary channel that are matching this one will be replied at the same time using the an answer derived from the answer on the primary channel. -} biasedSplitSyncProcess :: (QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> ChannelType {-^ primary and input channel type -} -> ChannelType {-^ secondary channel type -} -> -- a1 {-^ example answer on value channel to help compiler work out the protocol -} -> (q2 -> q1 -> Bool) {-^ decision whether a secondary query matches a primary query -} -> (q2 -> a1 -> a2) {-^ translation of primary answer to a secondary answer -} -> ERProcess sInAnyProt sOutAnyProt biasedSplitSyncProcess defName chtp1 chtp2 qry2MatchQry1 getAns2 = ERProcess defName deploy [chtp1] [chtp1, chtp2] where deploy deployName [inCHA] [primCHA, secCHA] _ = do stateTV <- newTVarIO [] -- list of pending queries on secondary channel forkIO $ accumulator stateTV forwarder stateTV where inCH = CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: inCH: " inCHA primCH = CH.castOut "ERNet.Blocks.Control.Basic: biasedSplitSyncProcess: primCH: " primCHA secCH = CH.castOut "ERNet.Blocks.Control.Basic: biasedSplitSyncProcess: secCH: " secCHA -- _ = CH.answerQuery False inCH (0, sampleAnswer) accumulator stateTV = do qryData@(secQryId, qry) <- CH.waitForQuery secCH -- add this query to the pending list: atomically $ modifyTVar stateTV $ (:) qryData -- and so on: accumulator stateTV forwarder stateTV = do qryData@(primQryId, qry) <- CH.waitForQuery primCH -- forward: inQryId <- CH.makeQuery primCH primQryId inCH qry ans <- CH.waitForAnswer primCH primQryId inCH inQryId -- answer on primary channel: CH.answerQuery False primCH (primQryId, ans) -- extract all matching queries on secondary channel: secQrys <- atomically $ do pendingList <- readTVar stateTV let (matchingQrys, prunedPendingList) = analyseList qry pendingList writeTVar stateTV prunedPendingList return matchingQrys -- answer all these queries: mapM (answerSecQry ans) secQrys -- go back to the beginning: forwarder stateTV where analyseList qry1 = analyseListAUX [] [] where analyseListAUX prevMatching prevPruned [] = (prevMatching, prevPruned) analyseListAUX prevMatching prevPruned (qry2Data@(qry2Id, qry2) : others) | qry2MatchQry1 qry2 qry1 = analyseListAUX (qry2Data : prevMatching) prevPruned others | otherwise = analyseListAUX prevMatching (qry2Data : prevPruned) others answerSecQry ans1 (qry2Id, qry2) = do CH.answerQuery False secCH (qry2Id, getAns2 qry2 ans1) {-| This process provides multiple copies of one single-threaded channel. merges splits a channel into two channels - primary channel and secondary channel. The primary channel is a clean forward of the original channel. The secondary channel can use a slightly different protocol than the primary channel. Any query on the secondary channel will be blocked until a matching query is received and processed on the primary channel. (The user must supply a function that decides whether or not the queries are matching.) Whenever a query is being answered on the primary channel, all queries pending on the secondary channel that are matching this one will be replied at the same time using the an answer derived from the answer on the primary channel. -} splitSyncProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> ChannelType {-^ type of all channels -} -> Int {-^ number of channels to serve -} -> a {-^ example answer on value channel to help compiler work out the protocol -} -> ERProcess sInAnyProt sOutAnyProt splitSyncProcess defName chtp n sampleAnswer = ERProcess defName deploy [chtp] (replicate n chtp) where deploy deployName [inCHA] outCHAs _ = do stateTV <- newTVarIO [] -- list of pending queries while another one is being processed forkIO $ accumulator stateTV forwarder stateTV where inCH = CH.castIn "ERNet.Blocks.Control.Basic: splitSyncProcess: inCH: " inCHA outCHs = map (\(chA, chN) -> CH.castOut ("ERNet.Blocks.Control.Basic: splitSyncProcess: outCH" ++ show chN ++ ": ") chA) $ zip outCHAs [0..] accumulator stateTV = do (chN, (qryId, QueryAnyProt qry_)) <- CH.waitForQueryMulti outCHAs let (Just qry) = cast qry_ -- add this query to the pending list: atomically $ modifyTVar stateTV $ (:) $ (chN, (qryId, qry)) -- and so on: accumulator stateTV forwarder stateTV = do -- wait till list is not empty and extract first pending query: (chN, (qryId, qry)) <- atomically $ do pendingList <- readTVar stateTV case pendingList of [] -> retry (qryData : _) -> return qryData let outCH = outCHs !! chN -- forward the query: inQryId <- CH.makeQuery outCH qryId inCH qry ans <- CH.waitForAnswer outCH qryId inCH inQryId let _ = [ans, sampleAnswer] -- indicate the protocol on inCH -- extract all matching queries: qrys <- atomically $ do pendingList <- readTVar stateTV let (matchingQrys, prunedPendingList) = analyseList qry pendingList writeTVar stateTV prunedPendingList return matchingQrys -- answer all these queries: mapM (answerQry ans) qrys -- go back to the beginning: forwarder stateTV where analyseList qry1 = analyseListAUX [] [] where analyseListAUX prevMatching prevPruned [] = (prevMatching, prevPruned) analyseListAUX prevMatching prevPruned (qry2Data@(_, (_, qry2)) : others) | qry2 == qry1 = analyseListAUX (qry2Data : prevMatching) prevPruned others | otherwise = analyseListAUX prevMatching (qry2Data : prevPruned) others answerQry ans1 (chN, (qry2Id, qry2)) = do CH.answerQuery False (outCHs !! chN) (qry2Id, ans1) -- getAns2 qry2 ans1) {-| This process acts as a "switch" for a group of channels, forwarding information from one of two groups of source channels. The special "switch" channel indicates whether to use one or the other. -} switchMultiProcess :: (CH.Channel sIn sOut sInAnyProt sOutAnyProt) => Bool {-^ should use channel cache? -} -> ERProcessName {-^ process identifier (string) -} -> [ChannelType] {-^ switched output channel types -} -> ERProcess sInAnyProt sOutAnyProt switchMultiProcess useCache defName chTypes = ERProcess defName deploy (chTBool : (chTypes ++ chTypes)) (chTypes) where deploy deployName (switchCHA : inCHAs) resCHAs _ = dispatcher where (in1CHAs, in2CHAs) = splitAt (length resCHAs) inCHAs switchCH = CH.castIn "ERNet.Blocks.Control.Basic: switchMultiProcess: switchCH: " switchCHA dispatcher = do (chN, qryData) <- CH.waitForQueryMulti resCHAs forkIO $ responder chN qryData dispatcher responder chN (resQryId, qry) = do case chtp of ChannelType _ sampleAnswer -> do resCH <- CH.castOutIO ("ERNet.Blocks.Control.Basic: switchMultiProcess: resCHAs !! " ++ show chN ++ ": ") resCHA let _ = CH.answerQuery False resCH (0, sampleAnswer) -- find out whether to switch to secondary source: switchQryId <- CH.makeQuery resCH resQryId switchCH QABoolQ shouldSwitch <- CH.waitForAnswer resCH resQryId switchCH switchQryId case () of _ -> -- ... persuaded Haskell parser that it is ok to have "where" inside a "do" do -- obtain the answer from either source: qryId <- CH.makeQueryAnyProt "ERNet.Blocks.Control.Basic: switchMultiProcess: making query on inCHA: " resCHA resQryId inCHA qry (_, ans) <- CH.waitForAnswerMulti resCHA resQryId [(inCHA, qryId)] -- pass it back: CH.answerQueryAnyProt "ERNet.Blocks.Control.Basic: switchMultiProcess: answering query on resCHA: " useCache resCHA (resQryId, ans) where inCHA = inCHAs !! chN inCHAs = if shouldSwitch then in2CHAs else in1CHAs where chtp = chTypes !! chN resCHA = resCHAs !! chN {-| This process acts as a simple pass-through + it decreases the effort index of each query except for a query with effort index zero it asks a special value provider. It can cope with several queries in parallel. -} improverIxSimpleProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> ChannelType {-^ type shared by all channels -} -> a {-^ sample answer to help the type checker -} -> ERProcess sInAnyProt sOutAnyProt improverIxSimpleProcess defName chType sampleAns = ERProcess defName deploy [chType, chType] [chType] where deploy _ [initCHA, inCHA] [resCHA] _ = do dispatcher where initCH = CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: initCH: " initCHA inCH = CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: inCH: " inCHA resCH = CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCH: " resCHA _ = [inCH, initCH] _ = CH.answerQuery False resCH (0, QAIxA sampleAns) dispatcher = do qryData <- CH.waitForQuery resCH forkIO $ responder qryData dispatcher responder (resQryId, QAIxQ ix sqry) = do ans <- case ix == 0 of True -> enquire initCH resQryId (QAIxQ ix sqry) False -> enquire inCH resQryId (QAIxQ (ix - 1) sqry) CH.answerQuery True resCH (resQryId, ans) enquire ch resQryId qry = do fwdQryId <- CH.makeQuery resCH resQryId ch qry CH.waitForAnswer resCH resQryId ch fwdQryId {-| This process acts as a simple pass-through + it remembers its last answer and provides it on another channel. It initialises its memory from a special value provider. -} improverNoIxSimpleProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> ChannelType {-^ type shared by all channels -} -> a {-^ sample answer to help the type checker -} -> ERProcess sInAnyProt sOutAnyProt improverNoIxSimpleProcess defName chType sampleAns = ERProcess defName deploy [chType, chType] [chType, chType] where deploy _ [initCHA, inCHA] [resCHA, resCurrCHA] _ = do stateTV <- newTVarIO Nothing forkIO $ currentValServer stateTV responder stateTV where initCH = CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: initCH: " initCHA inCH = CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: inCH: " inCHA resCH = CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCH: " resCHA resCurrCH = CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCurrCH: " resCurrCHA _ = [resCH, resCurrCH] _ = [inCH, initCH] _ = CH.answerQuery False resCH (0, sampleAns) currentValServer stateTV = do (qryId, qry) <- CH.waitForQuery resCurrCH mans <- atomically $ readTVar stateTV ans <- case mans of Nothing -> do initQryId <- CH.makeQuery resCurrCH qryId initCH qry ans <- CH.waitForAnswer resCurrCH qryId initCH initQryId atomically $ writeTVar stateTV $ Just ans return ans Just ans -> return ans CH.answerQuery False resCurrCH (qryId, ans) currentValServer stateTV responder stateTV = do (resQryId, qry) <- CH.waitForQuery resCH fwdQryId <- CH.makeQuery resCH resQryId inCH qry ans <- CH.waitForAnswer resCH resQryId inCH fwdQryId CH.answerQuery False resCH (resQryId, ans) atomically $ writeTVar stateTV $ Just ans responder stateTV