-- Communicating Haskell Processes.
-- Copyright (c) 2008, University of Kent.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * Neither the name of the University of Kent nor the names of its
--    contributors may be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


-- | Provides an instance of Arrow for process pipelines.  As described in
-- the original paper on arrows, they can be used to represent stream processing,
-- so CHP seemed like a possible fit for an arrow.
-- 
-- Whether this is /actually/ an instance of Arrow depends on technicalities.
--  This can be demonstrated with the arrow law @arr id >>> f = f = f >>> arr
-- id@.  Whether CHP satisfies this arrow law depends on the definition of
-- equality.
--
-- * If equality means that given the same input value, both arrows produce the
-- same corresponding output value, this is an arrow.
--
-- * If equality means you give the arrows the same single input and wait for the single output,
-- and the output is the same, this is an arrow.
--
-- * If equality means that you can feed the arrows lots of inputs (one after
-- the other) and the behaviour should be the same with regards to communication,
-- this is not an arrow.
--
-- The problem lies in the buffering inherent in arrows.  Imagine if @f@ is
-- a single function.  @f@ is effectively a buffer of one.  You can feed it
-- a single value, but no more than that until you read its output.  However,
-- if you have @arr id >>> f@, that can accept two inputs (one held by the
-- @arr id@ process and one held by @f@) before you must accept the output.
--
-- I am fairly confident that the arrow laws are satisfied for the
-- definition of equality that given the same single input, they will
-- produce the same single output.  If you don't worry too much about the
-- behavioural difference, and just take arrows as another way to wire
-- together a certain class of process network, you should do fine.
--
-- Added in version 1.0.2.
module Control.Concurrent.CHP.Arrow (ProcessPipeline, runPipeline, arrowProcess, arrStrict,
  ProcessPipelineLabel, runPipelineLabel, arrowProcessLabel, arrLabel, arrStrictLabel,
  (*>>>*), (*<<<*), (*&&&*), (*****)) where

-- I have got this module to work on GHC 6.8 and 6.10 by following the CPP-variant
-- instructions on this page: http://haskell.org/haskellwiki/Upgrading_packages

#if __GLASGOW_HASKELL__ >= 609
import Control.Category
import Prelude hiding ((.), id)
#endif

import Control.Arrow
#if __GLASGOW_HASKELL__ < 610
                      hiding (pure)
#endif
import Control.Monad
import Control.Parallel.Strategies

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Utils

-- | ProcessPipelineLabel is a version of 'ProcessPipeline' that allows the processes
-- to be labelled, and thus in turn for the channels connecting the processes to
-- be automatically labelled.  ProcessPipelineLabel is not an instance of Arrow,
-- but it does have a lot of similarly named functions for working with it.  This
-- awkwardness is due to the extra Show constraints on the connectors that allow
-- the arrow's contents to appear in traces.
--
-- If you don't use traces, use 'ProcessPipeline'.  If you do use traces, and want
-- to have better labels on the process and values used in your arrows, consider
-- switching to ProcessPipelineLabel.
--
-- ProcessPipelineLabel and all the functions that use it, were added in version
-- 1.5.0.
data ProcessPipelineLabel a b = ProcessPipelineLabel
  { runPipelineLabel :: Chanin a -> Chanout b -> CHP ()
    -- ^ Like 'runPipeline' but for 'ProcessPipelineLabel'
  , _pipelineLabels :: (String, String)
  }

-- | Like 'arrowProcess', but allows the process to be labelled.  The same
-- warnings as 'arrowProcess' apply.
arrowProcessLabel :: String -> (Chanin a -> Chanout b -> CHP ()) -> ProcessPipelineLabel a b
arrowProcessLabel l p = ProcessPipelineLabel p (l, l)

-- | Like 'arr' for 'ProcessPipeline', but allows the process to be labelled.
arrLabel :: String -> (a -> b) -> ProcessPipelineLabel a b
arrLabel l = arrowProcessLabel l . CHP.map

-- | Like 'arrStrict', but allows the process to be labelled.
arrStrictLabel :: NFData b => String -> (a -> b) -> ProcessPipelineLabel a b
arrStrictLabel l = arrowProcessLabel l . CHP.map'

-- | The '(>>>)' arrow combinator, for 'ProcessPipelineLabel'.
(*>>>*) :: Show b => ProcessPipelineLabel a b -> ProcessPipelineLabel b c
  -> ProcessPipelineLabel a c
(*>>>*) (ProcessPipelineLabel p (pl, pr)) (ProcessPipelineLabel q (ql, qr))
  = ProcessPipelineLabel (p |->|^ (pr ++ "->" ++ ql, q)) (pl, qr)

-- | The '(<<<)' arrow combinator, for 'ProcessPipelineLabel'.
(*<<<*) :: Show b => ProcessPipelineLabel b c -> ProcessPipelineLabel a b
  -> ProcessPipelineLabel a c
(*<<<*) = flip (*>>>*)

-- | The '(&&&)' arrow combinator, for 'ProcessPipelineLabel'.
(*&&&*) :: (Show b, Show c, Show c') => ProcessPipelineLabel b c -> ProcessPipelineLabel b c' -> ProcessPipelineLabel b (c, c')
(*&&&*) (ProcessPipelineLabel p (pl, pr))
        (ProcessPipelineLabel q (ql, qr))
  = ProcessPipelineLabel proc (mix pl ql, mix pr qr)
  where
    mix a b = "(" ++ a ++ "*&&&*" ++ b ++ ")"
    proc input output
       = do deltaP <- oneToOneChannel' $ chanLabel $ pl ++ ".in"
            deltaQ <- oneToOneChannel' $ chanLabel $ ql ++ ".in"
            joinP <- oneToOneChannel' $ chanLabel $ pr ++ ".out"
            joinQ <- oneToOneChannel' $ chanLabel $ qr ++ ".out"
            runParallel_
              [CHP.parDelta input (writers [deltaP, deltaQ])
              ,p (reader deltaP) (writer joinP)
              ,q (reader deltaQ) (writer joinQ)
              ,CHP.join (,) (reader joinP) (reader joinQ) output
              ]

-- | The '(***)' arrow combinator, for 'ProcessPipelineLabel'.
(*****) :: (Show b, Show b', Show c, Show c') => ProcessPipelineLabel b c -> ProcessPipelineLabel b' c'
  -> ProcessPipelineLabel (b, b') (c, c')
(*****) (ProcessPipelineLabel p (pl, pr))
        (ProcessPipelineLabel q (ql, qr))
  = ProcessPipelineLabel proc (mix pl ql, mix pr qr)
  where
    mix a b = "(" ++ a ++ "*****" ++ b ++ ")"
    proc input output
       = do deltaP <- oneToOneChannel' $ chanLabel $ mix pl ql ++ "->" ++ pl
            deltaQ <- oneToOneChannel' $ chanLabel $ mix pl ql ++ "->" ++ ql
            joinP <- oneToOneChannel' $ chanLabel $ pr ++ "->" ++ mix pr qr
            joinQ <- oneToOneChannel' $ chanLabel $ qr ++ "->" ++ mix pr qr
            runParallel_
              [CHP.split input (writer deltaP) (writer deltaQ)
              ,p (reader deltaP) (writer joinP)
              ,q (reader deltaQ) (writer joinQ)
              ,CHP.join (,) (reader joinP) (reader joinQ) output
              ]


-- | The type that is an instance of 'Arrow' for process pipelines.  See 'runPipeline'.
data ProcessPipeline a b = ProcessPipeline
  { runPipeline :: Chanin a -> Chanout b -> CHP ()
    -- ^ Given a 'ProcessPipeline' (formed using its 'Arrow' instance) and
    -- the channels to plug into the ends of the pipeline, returns the process
    -- representing the pipeline.
    --
    -- The pipeline will run forever (until poisoned) and you must run it in
    -- parallel to whatever is feeding it the inputs and reading off the outputs.
    --  Imagine that you want a process pipeline that takes in a pair of numbers,
    -- doubles the first and adds one to the second.  You could encode this
    -- in an arrow using:
    -- 
    -- > runPipeline (arr (*2) *** arr (+1))
    --
    -- Arrows are more useful where you already have processes written that
    -- process data and you want to easily wire them together.  The arrow notation
    -- is probably easier for doing that than declaring all the channels yourself
    -- and composing everything in parallel.
  }

-- | Adds a wrapper that forms this process into the right data type to be
-- part of an arrow.
--
-- Any process you apply this to should produce exactly one output per
-- input, or else you will find odd behaviour resulting (including deadlock).
--  So for example, /don't/ use @arrowProcess ('Control.Concurrent.CHP.Common.filter'
-- ...)@ or @arrowProcess 'Control.Concurrent.CHP.Common.stream'@ inside any arrow combinators
-- other than >>> and <<<.
--
-- Added in version 1.1.0
arrowProcess :: (Chanin a -> Chanout b -> CHP ()) -> ProcessPipeline a b
arrowProcess = ProcessPipeline

-- | Like the arr function of the ProcessPipeline arrow instance, but fully evaluates
-- the result before sending it.  If you are building process pipelines with arrows to
-- try and get some parallel speed-up, you should try this function instead of
-- arr itself.
-- 
-- Added in version 1.3.2
arrStrict :: NFData b => (a -> b) -> ProcessPipeline a b
arrStrict = ProcessPipeline . CHP.map'

#if __GLASGOW_HASKELL__ >= 609
instance Category ProcessPipeline where
  (ProcessPipeline q) . (ProcessPipeline p) = ProcessPipeline (p |->| q)
  id = ProcessPipeline CHP.id
#endif

instance Arrow ProcessPipeline where
#if __GLASGOW_HASKELL__ < 609
  (ProcessPipeline p) >>> (ProcessPipeline q) = ProcessPipeline (p |->| q)
#endif
  arr = ProcessPipeline . CHP.map

  first (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    runParallel_
      [ CHP.split in_ (writer c) (writer d)
      , p (reader c) (writer c')
      , CHP.join (,) (reader c') (reader d) out
      ]

  second (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    runParallel_
      [ CHP.split in_ (writer d) (writer c)
      , p (reader c) (writer c')
      , CHP.join (,) (reader d) (reader c') out
      ]

  (ProcessPipeline p) *** (ProcessPipeline q) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    d' <- newChannel
    runParallel_
      [ CHP.split in_ (writer c) (writer d)
      , p (reader c) (writer c')
      , q (reader d) (writer d')
      , CHP.join (,) (reader c') (reader d') out
      ]

  (ProcessPipeline p) &&& (ProcessPipeline q) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    d' <- newChannel
    runParallel_
      [ CHP.parDelta in_ [writer c, writer d]
      , p (reader c) (writer c')
      , q (reader d) (writer d')
      , CHP.join (,) (reader c') (reader d') out
      ]

instance ArrowChoice ProcessPipeline where
  left (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- oneToOneChannel
    d <- oneToOneChannel
    (forever $ do x <- readChannel in_
                  case x of
                    Left l -> do writeChannel (writer c) l
                                 l' <- readChannel (reader d)
                                 writeChannel out (Left l')
                    Right r -> writeChannel out (Right r)
     ) <||> p (reader c) (writer d)
    return ()

  right (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- oneToOneChannel
    d <- oneToOneChannel
    (forever $ do x <- readChannel in_
                  case x of
                    Right r -> do writeChannel (writer c) r
                                  r' <- readChannel (reader d)
                                  writeChannel out (Right r')
                    Left l -> writeChannel out (Left l)
     ) <||> p (reader c) (writer d)
    return ()

  (ProcessPipeline p) ||| (ProcessPipeline q)
    = ProcessPipeline $ \in_ out -> do
        c <- oneToOneChannel
        c' <- oneToOneChannel
        d <- oneToOneChannel
        d' <- oneToOneChannel
        runParallel_
          [ forever $ do x <- readChannel in_
                         x' <- case x of
                                 Left l -> do writeChannel (writer c) l
                                              readChannel (reader c')
                                 Right r -> do writeChannel (writer d) r
                                               readChannel (reader d')
                         writeChannel out x'
          , p (reader c) (writer c')
          , q (reader d) (writer d')
          ]

  (ProcessPipeline p) +++ (ProcessPipeline q)
    = ProcessPipeline $ \in_ out -> do
        c <- oneToOneChannel
        c' <- oneToOneChannel
        d <- oneToOneChannel
        d' <- oneToOneChannel
        runParallel_
          [ forever $ do x <- readChannel in_
                         x' <- case x of
                                 Left l -> do writeChannel (writer c) l
                                              l' <- readChannel (reader c')
                                              return (Left l')
                                 Right r -> do writeChannel (writer d) r
                                               r' <- readChannel (reader d')
                                               return (Right r')
                         writeChannel out x'
          , p (reader c) (writer c')
          , q (reader d) (writer d')
          ]