{-|
    Module      :  Control.ERNet.Blocks.Control.Basic
    Description :  processes with purely synchronisation effects 
    Copyright   :  (c) Michal Konecny
    License     :  BSD3

    Maintainer  :  mik@konecny.aow.cz
    Stability   :  experimental
    Portability :  portable

    A collection of processes whose main purpose is 
    to synchronise other processes and have little
    semantical value.
-}
module Control.ERNet.Blocks.Control.Basic 
(
    joinStepValProcess,
    splitSyncProcess,
    biasedSplitSyncProcess,
    switchMultiProcess,
    improverIxSimpleProcess,
    improverNoIxSimpleProcess
)
where

import Control.ERNet.Foundations.Protocol 
import Control.ERNet.Foundations.Protocol.StandardCombinators
import qualified Control.ERNet.Foundations.Event.Logger as LG 
import qualified Control.ERNet.Foundations.Channel as CH 
import Control.ERNet.Foundations.Process 

import Control.Concurrent as Concurrent
import Control.Concurrent.STM as STM
import Data.Number.ER.Misc.STM

import Data.Typeable

{-|
    This process joins information from two channels ("step", "val") 
    in such a way that it acts as a splitter of responsibilities 
    for its multi-threaded failure-enabled result channel as follows:
    
    * The "step" channel provides the timing and effort information for responses.
    
    * The "val" channel provides values without significant blocking.
    
    While the process is waiting for a response from the step channel,
    any queries are put on hold until the response comes.
    
    * If the step channel responds with indication of failure, then
    all pending queries are answered as failed.
    
    * If the step channel responds with ok, then all the pending queries
    are forwarded to the value channel and answered asap.
    No new queries are accepted during such forwarding stage.
     
-}
joinStepValProcess ::
    (QAProtocol q a, Show q, Show a, 
     CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
    ERProcessName {-^ process identifier (string) -} ->
    ChannelType {-^ value input channel type -} ->
    a {-^ example answer on value channel to help compiler work out the protocol -} ->
    ERProcess sInAnyProt sOutAnyProt
joinStepValProcess defName chtp sampleAnswer =
    ERProcess defName deploy [chTChanges (chTIx chTUnit), chtp] [chTChanges (chTIx chtp)]
    where
    deploy deployName [stepCHA, valCHA] [resCHA] _ =
        do
        stateTV <- newTVarIO [] -- list of pending queries
        forkIO $ forwarder stateTV
        accumulator stateTV
        where
        resCH = 
            CH.castOut "ERNet.Blocks.Control.Basic: joinStepValProcess: resCH: " 
                resCHA
        stepCH =
            CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: stepCH: " 
                stepCHA
        valCH =
            CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: valCH: " 
                valCHA
        
        accumulator stateTV =
            do
            qryData@(resQryId, qry) <- CH.waitForQuery resCH
            -- explore the kind of query:
            case qry of
                QAChangesQWhenNew _ _ ->
                    do
                    -- add this query to the pending list:
                    atomically $ modifyTVar stateTV $ (:) qryData
                    return ()
                QAChangesQ (QAIxQ ix q) ->
                    do
                    -- forward and answer this immediately:
                    valQryId <- CH.makeQuery resCH resQryId valCH q
                    a <- CH.waitForAnswer resCH resQryId valCH valQryId
                    let _ = [a, sampleAnswer] -- indicate the protocol on valCH
                    CH.answerQuery False resCH (resQryId, QAChangesANew $ QAIxA a)
            accumulator stateTV
            
        forwarder stateTV =
            do
            -- wait until pending list non-empty:
            (qryId, qry) <- atomically $ 
                do
                pendingList <- readTVar stateTV
                case pendingList of
                    [] -> retry
                    (qryData : _) -> return qryData
            -- explore the query:
            stepQryId <- case qry of
                QAChangesQWhenNew _ (QAIxQ ix q) ->
                    -- make a query to the step channel:
                    CH.makeQuery resCH qryId stepCH $ QAChangesQWhenNew 0 (QAIxQ ix QAUnitQ)
            -- wait for step answer signalling that value is ready:
            ans <- CH.waitForAnswer resCH qryId stepCH stepQryId
            -- obtain the current list of pending queries and empty the shared list:
            pendingList <- atomically $ modifyTVarGetOldVal stateTV (const [])
            -- investigate whether to indicate failure:
            case ans of
                QAChangesAGivenUp -> 
                    answerAllGivenUp pendingList
                _ ->
                    forwardAll pendingList
            -- go back to the beginning:
            forwarder stateTV
            
        answerAllGivenUp pendingList =
            do
            mapM_ (\ (qryId, _) -> CH.answerQuery False resCH (qryId, QAChangesAGivenUp))  
                pendingList
                
        forwardAll pendingList =
            do
            -- forward all pending queries:
            valQryIds <-
                mapM (\ (qryId, (QAChangesQWhenNew _ (QAIxQ ix q))) -> CH.makeQuery resCH qryId valCH q)
                     pendingList
            -- wait for all answers in turn:
            fwdAnswers <-
                mapM (\ (valQryId, (qryId, _)) -> CH.waitForAnswer resCH qryId valCH valQryId)  
                     (zip valQryIds pendingList)
            -- forward all these answers:
            mapM_ (\ (ans, (qryId, _)) -> CH.answerQuery False resCH (qryId, QAChangesANew $ QAIxA $ ans))  
                (zip fwdAnswers pendingList)

{-|
    This process provides two channels (primary, secondary) 
    split off from one source channel. The primary channel
    is a clean forward of the source channel.  The secondary
    channel can use a slightly different protocol than the primary channel.
      
    Any query on the secondary channel will be blocked until a matching
    query is received and processed on the primary channel.  (The user must
    supply a function that decides whether or not the queries are matching.)
    
    Whenever a query is being answered on the primary channel, 
    all queries pending on the secondary channel that are
    matching this one will be replied at the same time 
    using the an answer derived from the answer on the primary channel.
-}
biasedSplitSyncProcess ::
    (QAProtocol q1 a1, QAProtocol q2 a2, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
    ERProcessName {-^ process identifier (string) -} ->
    ChannelType {-^ primary and input channel type -} ->
    ChannelType {-^ secondary channel type -} ->
--    a1 {-^ example answer on value channel to help compiler work out the protocol -} ->
    (q2 -> q1 -> Bool) {-^ decision whether a secondary query matches a primary query -} ->
    (q2 -> a1 -> a2) {-^ translation of primary answer to a secondary answer -} ->
    ERProcess sInAnyProt sOutAnyProt
biasedSplitSyncProcess defName chtp1 chtp2 qry2MatchQry1 getAns2 =
    ERProcess defName deploy [chtp1] [chtp1, chtp2]
    where
    deploy deployName [inCHA] [primCHA, secCHA] _ =
        do
        stateTV <- newTVarIO [] -- list of pending queries on secondary channel
        forkIO $ accumulator stateTV
        forwarder stateTV
        where
        inCH =
            CH.castIn "ERNet.Blocks.Control.Basic: joinStepValProcess: inCH: " 
                inCHA
        primCH = 
            CH.castOut "ERNet.Blocks.Control.Basic: biasedSplitSyncProcess: primCH: " 
                primCHA
        secCH = 
            CH.castOut "ERNet.Blocks.Control.Basic: biasedSplitSyncProcess: secCH: " 
                secCHA
        
--        _ = CH.answerQuery False inCH (0, sampleAnswer)
        
        accumulator stateTV =
            do
            qryData@(secQryId, qry) <- CH.waitForQuery secCH
            -- add this query to the pending list:
            atomically $ modifyTVar stateTV $ (:) qryData
            -- and so on:
            accumulator stateTV
            
        forwarder stateTV =
            do
            qryData@(primQryId, qry) <- CH.waitForQuery primCH
            -- forward:
            inQryId <- CH.makeQuery primCH primQryId inCH qry
            ans <- CH.waitForAnswer primCH primQryId inCH inQryId 
            -- answer on primary channel:
            CH.answerQuery False primCH (primQryId, ans)
            -- extract all matching queries on secondary channel:
            secQrys <- atomically $ 
                do
                pendingList <- readTVar stateTV
                let (matchingQrys, prunedPendingList) = analyseList qry pendingList
                writeTVar stateTV prunedPendingList
                return matchingQrys
            -- answer all these queries:
            mapM (answerSecQry ans) secQrys
            -- go back to the beginning:
            forwarder stateTV
            where
            analyseList qry1 =
                analyseListAUX [] []
                where
                analyseListAUX prevMatching prevPruned [] = (prevMatching, prevPruned)
                analyseListAUX prevMatching prevPruned (qry2Data@(qry2Id, qry2) : others)
                    | qry2MatchQry1 qry2 qry1 =
                        analyseListAUX (qry2Data : prevMatching) prevPruned others
                    | otherwise =
                        analyseListAUX prevMatching (qry2Data : prevPruned) others             
            answerSecQry ans1 (qry2Id, qry2) =
                do
                CH.answerQuery False secCH (qry2Id, getAns2 qry2 ans1)

{-|
    This process provides multiple copies of one single-threaded channel. 
    
     merges splits a channel into two channels 
    - primary channel and secondary channel.  The primary channel
    is a clean forward of the original channel.  The secondary
    channel can use a slightly different protocol than the primary channel.
      
    Any query on the secondary channel will be blocked until a matching
    query is received and processed on the primary channel.  (The user must
    supply a function that decides whether or not the queries are matching.)
    
    Whenever a query is being answered on the primary channel, 
    all queries pending on the secondary channel that are
    matching this one will be replied at the same time 
    using the an answer derived from the answer on the primary channel.
-}
splitSyncProcess ::
    (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
    ERProcessName {-^ process identifier (string) -} ->
    ChannelType {-^ type of all channels -} ->
    Int {-^ number of channels to serve -} ->
    a {-^ example answer on value channel to help compiler work out the protocol -} ->
    ERProcess sInAnyProt sOutAnyProt
splitSyncProcess defName chtp n sampleAnswer =
    ERProcess defName deploy [chtp] (replicate n chtp)
    where
    deploy deployName [inCHA] outCHAs _ =
        do
        stateTV <- newTVarIO [] -- list of pending queries while another one is being processed
        forkIO $ accumulator stateTV
        forwarder stateTV
        where
        inCH = 
            CH.castIn "ERNet.Blocks.Control.Basic: splitSyncProcess: inCH: " 
                inCHA 
        outCHs = 
            map (\(chA, chN) -> 
                    CH.castOut ("ERNet.Blocks.Control.Basic: splitSyncProcess: outCH" ++ show chN ++ ": ") chA) $ 
                zip outCHAs [0..]
        
        accumulator stateTV =
            do
            (chN, (qryId, QueryAnyProt qry_)) <- CH.waitForQueryMulti outCHAs
            let (Just qry) = cast qry_ 
            -- add this query to the pending list:
            atomically $ modifyTVar stateTV $ (:) $ (chN, (qryId, qry))
            -- and so on:
            accumulator stateTV
            
        forwarder stateTV =
            do
            -- wait till list is not empty and extract first pending query:
            (chN, (qryId, qry)) <- atomically $
                do
                pendingList <- readTVar stateTV
                case pendingList of
                    [] -> retry
                    (qryData : _) -> return qryData
            let outCH = outCHs !! chN
            -- forward the query:
            inQryId <- CH.makeQuery outCH qryId inCH qry
            ans <- CH.waitForAnswer outCH qryId inCH inQryId
            let _ = [ans, sampleAnswer] -- indicate the protocol on inCH 
            -- extract all matching queries:
            qrys <- atomically $ 
                do
                pendingList <- readTVar stateTV
                let (matchingQrys, prunedPendingList) = analyseList qry pendingList
                writeTVar stateTV prunedPendingList
                return matchingQrys
            -- answer all these queries:
            mapM (answerQry ans) qrys
            -- go back to the beginning:
            forwarder stateTV
            where
            analyseList qry1 =
                analyseListAUX [] []
                where
                analyseListAUX prevMatching prevPruned [] = (prevMatching, prevPruned)
                analyseListAUX prevMatching prevPruned (qry2Data@(_, (_, qry2)) : others)
                    | qry2 == qry1 =
                        analyseListAUX (qry2Data : prevMatching) prevPruned others
                    | otherwise =
                        analyseListAUX prevMatching (qry2Data : prevPruned) others             
            answerQry ans1 (chN, (qry2Id, qry2)) =
                do
                CH.answerQuery False (outCHs !! chN) (qry2Id, ans1) -- getAns2 qry2 ans1)

{-|
    This process acts as a "switch" for a group of channels, forwarding information
    from one of two groups of source channels.  The special "switch" channel
    indicates whether to use one or the other. 
-}
switchMultiProcess ::
    (CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
    Bool {-^ should use channel cache? -} ->
    ERProcessName {-^ process identifier (string) -} ->
    [ChannelType] {-^ switched output channel types -} ->
    ERProcess sInAnyProt sOutAnyProt
switchMultiProcess useCache defName chTypes =
    ERProcess defName deploy (chTBool : (chTypes ++ chTypes)) (chTypes)
    where
    deploy deployName (switchCHA : inCHAs) resCHAs _ =
        dispatcher
        where
        (in1CHAs, in2CHAs) = 
            splitAt (length resCHAs) inCHAs
        switchCH =
            CH.castIn "ERNet.Blocks.Control.Basic: switchMultiProcess: switchCH: " switchCHA 
    
        dispatcher =
            do            
            (chN, qryData) <- CH.waitForQueryMulti resCHAs
            forkIO $ responder chN qryData
            dispatcher
            
        responder chN (resQryId, qry) =
            do
            case chtp of
                ChannelType _ sampleAnswer ->
                    do
                    resCH <- 
                        CH.castOutIO 
                            ("ERNet.Blocks.Control.Basic: switchMultiProcess: resCHAs !! " ++ show chN ++ ": ") 
                            resCHA
                    let _ = CH.answerQuery False resCH (0, sampleAnswer)
                    -- find out whether to switch to secondary source:
                    switchQryId <- CH.makeQuery resCH resQryId switchCH QABoolQ
                    shouldSwitch <- CH.waitForAnswer resCH resQryId switchCH switchQryId
                    case () of 
                        _ -> -- ... persuaded Haskell parser that it is ok to have "where" inside a "do"
                            do
                            -- obtain the answer from either source:
                            qryId <- 
                                CH.makeQueryAnyProt "ERNet.Blocks.Control.Basic: switchMultiProcess: making query on inCHA: " 
                                    resCHA resQryId inCHA qry
                            (_, ans) <- 
                                CH.waitForAnswerMulti resCHA resQryId [(inCHA, qryId)]
                            -- pass it back:
                            CH.answerQueryAnyProt "ERNet.Blocks.Control.Basic: switchMultiProcess: answering query on resCHA: "
                                useCache resCHA (resQryId, ans)
                            where
                            inCHA = inCHAs !! chN
                            inCHAs = if shouldSwitch then in2CHAs else in1CHAs
            where
            chtp = chTypes !! chN
            resCHA = resCHAs !! chN
            
{-|
    This process acts as a simple pass-through 
    + it decreases the effort index of each query
    except for a query with effort index zero
    it asks a special value provider.
    It can cope with several queries in parallel.
-}
improverIxSimpleProcess ::
    (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
    ERProcessName {-^ process identifier (string) -} ->
    ChannelType {-^ type shared by all channels -} ->
    a {-^ sample answer to help the type checker -} ->
    ERProcess sInAnyProt sOutAnyProt
improverIxSimpleProcess defName chType sampleAns =
    ERProcess defName deploy [chType, chType] [chType]
    where
    deploy _ [initCHA, inCHA] [resCHA] _ =
        do
        dispatcher
        where
        initCH =
            CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: initCH: "
                initCHA
        inCH =
            CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: inCH: "
                inCHA
        resCH =
            CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCH: "
                resCHA
        _ = [inCH, initCH]
        _ = CH.answerQuery False resCH (0, QAIxA sampleAns)

        dispatcher =
            do
            qryData <- CH.waitForQuery resCH
            forkIO $ responder qryData
            dispatcher
            
        responder (resQryId, QAIxQ ix sqry) =
            do            
            ans <- 
                case ix == 0 of
                    True -> enquire initCH resQryId (QAIxQ ix sqry)
                    False -> enquire inCH resQryId (QAIxQ (ix - 1) sqry)
            CH.answerQuery True resCH (resQryId, ans)
            
        enquire ch resQryId qry =
            do
            fwdQryId <- CH.makeQuery resCH resQryId ch qry
            CH.waitForAnswer resCH resQryId ch fwdQryId

{-|
    This process acts as a simple pass-through + it 
    remembers its last answer and provides it on another channel.
    It initialises its memory from a special value provider.  
-}
improverNoIxSimpleProcess ::
    (QAProtocol q a, CH.Channel sIn sOut sInAnyProt sOutAnyProt) =>
    ERProcessName {-^ process identifier (string) -} ->
    ChannelType {-^ type shared by all channels -} ->
    a {-^ sample answer to help the type checker -} ->
    ERProcess sInAnyProt sOutAnyProt
improverNoIxSimpleProcess defName chType sampleAns =
    ERProcess defName deploy [chType, chType] [chType, chType]
    where
    deploy _ [initCHA, inCHA] [resCHA, resCurrCHA] _ =
        do
        stateTV <- newTVarIO Nothing 
        forkIO $ currentValServer stateTV
        responder stateTV
        where
        initCH =
            CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: initCH: "
                initCHA
        inCH =
            CH.castIn "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: inCH: "
                inCHA
        resCH =
            CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCH: "
                resCHA
        resCurrCH =
            CH.castOut "ERNet.Blocks.Control.Basic: improverNoIxSimpleProcess: resCurrCH: "
                resCurrCHA
        _ = [resCH, resCurrCH]
        _ = [inCH, initCH]
        _ = CH.answerQuery False resCH (0, sampleAns)
    
        currentValServer stateTV =
            do
            (qryId, qry) <- CH.waitForQuery resCurrCH
            mans <- atomically $ readTVar stateTV
            ans <- case mans of
                Nothing -> 
                    do
                    initQryId <- CH.makeQuery resCurrCH qryId initCH qry
                    ans <- CH.waitForAnswer resCurrCH qryId initCH initQryId
                    atomically $ writeTVar stateTV $ Just ans
                    return ans
                Just ans ->
                    return ans                    
            CH.answerQuery False resCurrCH (qryId, ans)
            currentValServer stateTV
                
        responder stateTV =
            do            
            (resQryId, qry) <- CH.waitForQuery resCH
            fwdQryId <- CH.makeQuery resCH resQryId inCH qry
            ans <- CH.waitForAnswer resCH resQryId inCH fwdQryId
            CH.answerQuery False resCH (resQryId, ans)
            atomically $ writeTVar stateTV $ Just ans
            responder stateTV