{-| Module : Control.ERNet.Blocks.Basic Description : generic processes and process templates Copyright : (c) Michal Konecny License : BSD3 Maintainer : mik@konecny.aow.cz Stability : experimental Portability : portable Definitions of a few universaly useful simple network processes and process templates. -} module Control.ERNet.Blocks.Basic ( constantProcess, constantChangedProcess, constantStatefulProcess, passThroughProcess, maybePassThroughProcess, passThroughStatefulProcess, passThroughBinaryStatefulProcess, passThroughBinaryProcess, rateProcess, precProcess ) where import Control.ERNet.Foundations.Protocol import Control.ERNet.Foundations.Protocol.StandardCombinators import qualified Control.ERNet.Foundations.Event.Logger as LG import qualified Control.ERNet.Foundations.Channel as CH import Control.ERNet.Foundations.Process import Data.Number.ER.BasicTypes import Data.Maybe import Data.Typeable import qualified Data.Map as Map import Control.Concurrent as Concurrent import Control.Concurrent.STM as STM import Data.Number.ER.Misc.STM {-| A generic stateless process with no inputs. -} constantProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> (q -> a) {- ^ function to answer queries -} -> ChannelType {- ^ result channel type -} -> ERProcess sInAnyProt sOutAnyProt constantProcess defName responderFn = constantStatefulProcess defName responderTransferFn () where responderTransferFn _ (_, q) = ((True, responderFn q), Nothing) {-| A generic process with no inputs that answers using a ChTChanges protocol. -} constantChangedProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> (q -> a) {- ^ function to answer queries (without the ChTChanges wrapper) -} -> ChannelType {- ^ result channel type -} -> ERProcess sInAnyProt sOutAnyProt constantChangedProcess defName responderFn chtp = constantProcess defName responderNoChangesFn chtp where responderNoChangesFn (QAChangesQ q) = (QAChangesANew (responderFn q)) responderNoChangesFn (QAChangesQIfNew _ _) = QAChangesASame responderNoChangesFn (QAChangesQWhenNew _ _) = QAChangesAGivenUp {-| A generic stateful process with no inputs. -} constantStatefulProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> (s -> (QueryId, q) -> ((Bool, a), Maybe s)) {- ^ function to answer queries and transform state -} -> s {-^ initial state -} -> ChannelType {- ^ result channel type -} -> ERProcess sInAnyProt sOutAnyProt constantStatefulProcess defName responderTransferFn initState chtp = ERProcess defName deploy [] [chtp] where deploy _ [] [resCHA] _ = do stateTV <- newTVarIO initState dispatcher stateTV where resCH = CH.castOut "ERNet.Blocks.Basic: constantStatefulProcess: resCH: " resCHA dispatcher stateTV = do qryData <- CH.waitForQuery resCH -- putStrLn $ name ++ ": got query" forkIO $ responder stateTV qryData dispatcher stateTV responder stateTV (qryId, qry) = do (useCache, ans) <- atomically updateState CH.answerQuery useCache resCH (qryId, ans) -- putStrLn $ name ++ ": answered query where updateState = do state <- readTVar stateTV case responderTransferFn state (qryId, qry) of (ansData, Nothing) -> return ansData (ansData, Just newState) -> do writeTVar stateTV newState return ansData {-| A process with one input and one output socket. Upon receiving a query or an answer related to a previously received query, the process uses the provided functions to decide whether to answer the query, make a new query or wait until the state meets a certain condition. When the condition is met, the event (query or answer) in question is processed again using the same function. Several simpler processes are defined as specialisations of this one. -} passThroughStatefulProcess :: (QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName -> (s -> (QueryId, q1) -> (ERProcessAction s q2 a1, Maybe s)) {-^ what to do with a query + forward or reply -} -> (s -> (QueryId, q1) -> (q2,a2) -> (ERProcessAction s q2 a1, Maybe s)) {-^ what to do with an answer - another query or reply -} -> s {-^ initial state -} -> ChannelType {-^ argument channel type -} -> ChannelType {-^ result channel type -} -> ERProcess sInAnyProt sOutAnyProt passThroughStatefulProcess defName qryStFn ansStFn initState chtpIn chtpRes = ERProcess defName deploy [chtpIn] [chtpRes] where deploy deployName [argCHA] [resCHA] _ = do stateTV <- newTVarIO initState dispatcher stateTV where resCH = CH.castOut "ERNet.Blocks.Basic: passThroughStatefulProcess: resCH: " resCHA argCH = CH.castIn "ERNet.Blocks.Basic: passThroughStatefulProcess: argCH: " argCHA dispatcher stateTV = do qryData <- CH.waitForQuery resCH forkIO $ responder stateTV qryData dispatcher stateTV responder stateTV qryData@(resQryId, qry) = do action <- atomically $ processQuery stateTV qryData proceed stateTV qryData action proceed stateTV qryData (ERProcessActionRetryWhen triggerCond) = do atomically $ do state <- readTVar stateTV case triggerCond state of True -> return () False -> retry responder stateTV qryData proceed stateTV qryData@(resQryId, qry) (ERProcessActionQuery qry2) = do argQryId <- CH.makeQuery resCH resQryId argCH qry2 ans2 <- CH.waitForAnswer resCH resQryId argCH argQryId qry2OrAns <- atomically $ processAnswer stateTV qryData (qry2,ans2) proceed stateTV qryData qry2OrAns proceed stateTV qryData@(resQryId, qry) (ERProcessActionAnswer useCache ans) = CH.answerQuery useCache resCH (resQryId, ans) processQuery stateTV qryData = do state <- readTVar stateTV let (qry2OrAns, maybeNewState) = qryStFn state qryData maybeWriteTVar stateTV maybeNewState return qry2OrAns processAnswer stateTV qryData ans2 = do state <- readTVar stateTV let (qry2OrAns, maybeNewState) = ansStFn state qryData ans2 maybeWriteTVar stateTV maybeNewState return qry2OrAns maybeWriteTVar stateTV maybeNewState = case maybeNewState of Just newState -> writeTVar stateTV newState Nothing -> return () {-| A simple process that passes on a translated version of each query to another process and translates the answers before passing them back. -} passThroughProcess :: (QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => Bool {-^ should use channel cache? -} -> ERProcessName -> (q1 -> q2) {-^ query translator -} -> (q1 -> a2 -> a1) {-^ answer translator -} -> ChannelType {-^ argument channel type -} -> ChannelType {-^ result channel type -} -> ERProcess sInAnyProt sOutAnyProt passThroughProcess useCache defName preFn postFn = passThroughStatefulProcess defName preStFn postStFn () where preStFn _ (_, q) = (ERProcessActionQuery (preFn q), Nothing) postStFn _ (_, q) (_,a) = (ERProcessActionAnswer useCache (postFn q a), Nothing) {-| A simple process that either responds with no further queries or passes on a translated version of the query to another process, and then passing back a translated version of the answer received. -} maybePassThroughProcess :: (QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => Bool {-^ should use channel cache? -} -> ERProcessName -> (q1 -> Bool) {-^ if True, query should NOT be passed on -} -> (q1 -> a1) {-^ responder to use if not passing queries on -} -> (q1 -> q2) {-^ translator for queries to pass on -} -> (q1 -> a2 -> a1) {-^ translator for passed answers -} -> ChannelType {-^ input channel type -} -> ChannelType {-^ output channel type -} -> ERProcess sInAnyProt sOutAnyProt maybePassThroughProcess useCache defName noPass noPassResponder preFn postFn = passThroughStatefulProcess defName qryStFn ansStFn () where qryStFn _ (_, q) | noPass q = (ERProcessActionAnswer useCache (noPassResponder q), Nothing) -- answer | otherwise = (ERProcessActionQuery (preFn q), Nothing) -- pass on translated query ansStFn _ (_, q) (_, a) = (ERProcessActionAnswer useCache (postFn q a), Nothing) -- reply translated answer {-| A process passing on information without modification, except for improving the convergence rate in successive queries. Each query may refer to a previous query. When it does, the query will not be answered until either: * the answer has improved sufficiently since last time one was given * the number of queries made in response to this query has reached the given limit Currently supports only single-threaded querying. -} rateProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName {-^ process identifier (string) -} -> (a -> a -> Bool) {-^ function to judge whether the improvement is good enough -} -> Int {-^ maximum number of attempts to reach desired improvement -} -> ChannelType {-^ number channel type -} -> ERProcess sInAnyProt sOutAnyProt rateProcess defName goodEnough maxAttepts chtp = passThroughStatefulProcess defName qryStFn ansStFn initState chtp chtp where initState = (Nothing, 0) -- no previous answer yet qryStFn _ qryData@(_, qry) = (ERProcessActionQuery qry, Nothing) ansStFn (Nothing, _) qryData (_, ans) = (ERProcessActionAnswer False ans, Just (Just ans, 0)) -- remember answer for comparing with future answers ansStFn (Just prevAns, prevAttemptNo) qryData@(_, qry) (_, ans) | goodEnough prevAns ans || prevAttemptNo >= maxAttepts - 1 = (ERProcessActionAnswer False ans, Just (Just ans, 0)) -- reset attempt counter | otherwise = (ERProcessActionQuery qry, Just (Just prevAns, prevAttemptNo + 1)) -- increase attempt counter {-| A trivial passthrough process that only: * reduces prec by 1 in all queries * ensures that the granularity of all answers is raised to prec -} precProcess :: (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => Bool {-^ should use channel cache? -} -> ERProcessName -> ChannelType {-^ in, out channel type -} -> a {-^ sample answer (without QAIxA) to identify protocol type -} -> ERProcess sInAnyProt sOutAnyProt precProcess useCache name chtp sampleA = passThroughProcess useCache name lowerIx setMinGranAux chtp chtp where lowerIx (QAIxQ ix q) = (QAIxQ (ix - 1) q) setMinGranAux (QAIxQ ix q) a = qaaSetMinGran g a where g = effIx2gran ix _ = qaMatch q sampleA -- force type unification {-| A process that passes on a translated version of each query to one or both of another 2 channels. When the other channel(s) answer, it analyses the answer(s) and decides whether to send other queries or answer its original query. -} passThroughBinaryStatefulProcess :: (QAProtocol q1 a1, QAProtocol q2 a2, QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => ERProcessName -> (s -> (QueryId, q) -> (ERProcessAction s (Maybe q1, Maybe q2) a, Maybe s)) {-^ what to do with a query + forward or reply -} -> (s -> (QueryId, q) -> (Maybe a1, Maybe a2) -> (ERProcessAction s (Maybe q1, Maybe q2) a, Maybe s)) {-^ what to do with an answer - another query or reply -} -> s {-^ initial state -} -> (ChannelType, ChannelType) {-^ argument channels types -} -> ChannelType {-^ result channel type -} -> ERProcess sInAnyProt sOutAnyProt passThroughBinaryStatefulProcess defName qryStFn ansStFn initState (chtpIn1, chtpIn2) chtpRes = ERProcess defName deploy [chtpIn1, chtpIn2] [chtpRes] where deploy deployName [arg1CHA, arg2CHA] [resCHA] _ = do stateTV <- newTVarIO initState dispatcher stateTV where resCH = CH.castOut "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: resCH: " resCHA arg1CH = CH.castIn "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: arg1CH: " arg1CHA arg2CH = CH.castIn "ERNet.Blocks.Basic: passThroughBinaryStatefulProcess: arg2CH: " arg2CHA dispatcher stateTV = do qryData <- CH.waitForQuery resCH forkIO $ responder stateTV qryData dispatcher stateTV responder stateTV qryData@(resQryId, qry) = do qry12OrAns <- atomically $ processQuery stateTV qryData proceed stateTV qryData qry12OrAns proceed stateTV qryData (ERProcessActionRetryWhen triggerCond) = do atomically $ do state <- readTVar stateTV case triggerCond state of True -> return () False -> retry responder stateTV qryData proceed stateTV qryData@(resQryId, qry) (ERProcessActionQuery (mqry1, mqry2)) = do mans1 <- maybeQuerySync arg1CH mqry1 mans2 <- maybeQuerySync arg2CH mqry2 qry12OrAns <- atomically $ processAnswer stateTV qryData (mans1, mans2) proceed stateTV qryData qry12OrAns where maybeQuerySync argCH Nothing = return Nothing maybeQuerySync argCH (Just qry) = do argQryId <- CH.makeQuery resCH resQryId argCH qry ans <- CH.waitForAnswer resCH resQryId argCH argQryId return $ Just ans proceed stateTV qryData@(resQryId, qry) (ERProcessActionAnswer useCache ans) = CH.answerQuery useCache resCH (resQryId, ans) processQuery stateTV qryData = do state <- readTVar stateTV let (qry12OrAns, maybeNewState) = qryStFn state qryData maybeWriteTVar stateTV maybeNewState return qry12OrAns processAnswer stateTV qryData (mans1, mans2) = do state <- readTVar stateTV let (qry12OrAns, maybeNewState) = ansStFn state qryData (mans1, mans2) maybeWriteTVar stateTV maybeNewState return qry12OrAns maybeWriteTVar stateTV maybeNewState = case maybeNewState of Just newState -> writeTVar stateTV newState Nothing -> return () {-| A simple process that passes on a translated version of each query to another process and translates the answers before passing them back. -} passThroughBinaryProcess :: (QAProtocol q1 a1, QAProtocol q2 a2, QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) => Bool {-^ should use channel cache? -} -> ERProcessName -> (q -> (q1,q2)) {-^ query translator -} -> (q -> (a1,a2) -> a) {-^ answer translator -} -> (ChannelType, ChannelType) {-^ argument channels types -} -> ChannelType {-^ result channel type -} -> ERProcess sInAnyProt sOutAnyProt passThroughBinaryProcess useCache defName preFn postFn = passThroughBinaryStatefulProcess defName preStFn postStFn () where preStFn _ (_, q) = (ERProcessActionQuery (Just q1, Just q2), Nothing) where (q1, q2) = preFn q postStFn _ (_, q) (Just a1, Just a2) = (ERProcessActionAnswer useCache (postFn q (a1, a2)), Nothing)