-- Communicating Haskell Processes.
-- Copyright (c) 2008, University of Kent.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * Neither the name of the University of Kent nor the names of its
--    contributors may be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


-- | Provides an instance of Arrow for process pipelines.  As described in
-- the original paper on arrows, they can be used to represent stream processing,
-- so CHP seemed like a possible fit for an arrow.
-- 
-- Whether this is /actually/ an instance of Arrow depends on technicalities.
--  This can be demonstrated with the arrow law @arr id >>> f = f = f >>> arr
-- id@.  Whether CHP satisfies this arrow law depends on the definition of
-- equality.
--
-- * If equality means that given the same input value, both arrows produce the
-- same corresponding output value, this is an arrow.
--
-- * If equality means you give the arrows the same single input and wait for the single output,
-- and the output is the same, this is an arrow.
--
-- * If equality means that you can feed the arrows lots of inputs (one after
-- the other) and the behaviour should be the same with regards to communication,
-- this is not an arrow.
--
-- The problem lies in the buffering inherent in arrows.  Imagine if @f@ is
-- a single function.  @f@ is effectively a buffer of one.  You can feed it
-- a single value, but no more than that until you read its output.  However,
-- if you have @arr id >>> f@, that can accept two inputs (one held by the
-- @arr id@ process and one held by @f@) before you must accept the output.
--
-- I am fairly confident that the arrow laws are satisfied for the
-- definition of equality that given the same single input, they will
-- produce the same single output.  If you don't worry too much about the
-- behavioural difference, and just take arrows as another way to wire
-- together a certain class of process network, you should do fine.
--
-- Added in version 1.0.2.
module Control.Concurrent.CHP.Arrow (ProcessPipeline, runPipeline, arrowProcess) where

-- I have got this module to work on GHC 6.8 and 6.10 by following the CPP-variant
-- instructions on this page: http://haskell.org/haskellwiki/Upgrading_packages

#if __GLASGOW_HASKELL__ >= 609
import Control.Category
import Prelude hiding ((.), id)
#endif

import Control.Arrow
#if __GLASGOW_HASKELL__ < 610
                      hiding (pure)
#endif
import Control.Monad

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Utils

-- | The type that is an instance of 'Arrow' for process pipelines.  See 'runPipeline'.
data ProcessPipeline a b = ProcessPipeline
  { runPipeline :: Chanin a -> Chanout b -> CHP ()
    -- ^ Given a 'ProcessPipeline' (formed using its 'Arrow' instance) and
    -- the channels to plug into the ends of the pipeline, returns the process
    -- representing the pipeline.
    --
    -- The pipeline will run forever (until poisoned) and you must run it in
    -- parallel to whatever is feeding it the inputs and reading off the outputs.
    --  Imagine that you want a process pipeline that takes in a pair of numbers,
    -- doubles the first and adds one to the second.  You could encode this
    -- in an arrow using:
    -- 
    -- > runPipeline (arr (*2) *** arr (+1))
    --
    -- Arrows are more useful where you already have processes written that
    -- process data and you want to easily wire them together.  The arrow notation
    -- is probably easier for doing that than declaring all the channels yourself
    -- and composing everything in parallel.
  }

-- | Adds a wrapper that forms this process into the right data type to be
-- part of an arrow.
--
-- Any process you apply this to should produce exactly one output per
-- input, or else you will find odd behaviour resulting (including deadlock).
--  So for example, /don't/ use @arrowProcess ('Control.Concurrent.CHP.Common.filter'
-- ...)@ or @arrowProcess 'Control.Concurrent.CHP.Common.stream'@
--
-- Added in version 1.1.0
arrowProcess :: (Chanin a -> Chanout b -> CHP ()) -> ProcessPipeline a b
arrowProcess = ProcessPipeline

#if __GLASGOW_HASKELL__ >= 609
instance Category ProcessPipeline where
  (ProcessPipeline q) . (ProcessPipeline p) = ProcessPipeline (p |->| q)
  id = ProcessPipeline CHP.id
#endif

instance Arrow ProcessPipeline where
#if __GLASGOW_HASKELL__ < 609
  (ProcessPipeline p) >>> (ProcessPipeline q) = ProcessPipeline (p |->| q)
#endif
  arr = ProcessPipeline . CHP.map

  first (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    runParallel_
      [ CHP.split in_ (writer c) (writer d)
      , p (reader c) (writer c')
      , CHP.join (,) (reader c') (reader d) out
      ]

  second (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    runParallel_
      [ CHP.split in_ (writer d) (writer c)
      , p (reader c) (writer c')
      , CHP.join (,) (reader d) (reader c') out
      ]

  (ProcessPipeline p) *** (ProcessPipeline q) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    d' <- newChannel
    runParallel_
      [ CHP.split in_ (writer c) (writer d)
      , p (reader c) (writer c')
      , q (reader d) (writer d')
      , CHP.join (,) (reader c') (reader d') out
      ]

  (ProcessPipeline p) &&& (ProcessPipeline q) = ProcessPipeline $ \in_ out -> do
    c <- newChannel
    c' <- newChannel
    d <- newChannel
    d' <- newChannel
    runParallel_
      [ CHP.parDelta in_ [writer c, writer d]
      , p (reader c) (writer c')
      , q (reader d) (writer d')
      , CHP.join (,) (reader c') (reader d') out
      ]

instance ArrowChoice ProcessPipeline where
  left (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- oneToOneChannel
    d <- oneToOneChannel
    (forever $ do x <- readChannel in_
                  case x of
                    Left l -> do writeChannel (writer c) l
                                 l' <- readChannel (reader d)
                                 writeChannel out (Left l')
                    Right r -> writeChannel out (Right r)
     ) <||> p (reader c) (writer d)
    return ()

  right (ProcessPipeline p) = ProcessPipeline $ \in_ out -> do
    c <- oneToOneChannel
    d <- oneToOneChannel
    (forever $ do x <- readChannel in_
                  case x of
                    Right r -> do writeChannel (writer c) r
                                  r' <- readChannel (reader d)
                                  writeChannel out (Right r')
                    Left l -> writeChannel out (Left l)
     ) <||> p (reader c) (writer d)
    return ()

  (ProcessPipeline p) ||| (ProcessPipeline q)
    = ProcessPipeline $ \in_ out -> do
        c <- oneToOneChannel
        c' <- oneToOneChannel
        d <- oneToOneChannel
        d' <- oneToOneChannel
        runParallel_
          [ forever $ do x <- readChannel in_
                         x' <- case x of
                                 Left l -> do writeChannel (writer c) l
                                              readChannel (reader c')
                                 Right r -> do writeChannel (writer d) r
                                               readChannel (reader d')
                         writeChannel out x'
          , p (reader c) (writer c')
          , q (reader d) (writer d')
          ]

  (ProcessPipeline p) +++ (ProcessPipeline q)
    = ProcessPipeline $ \in_ out -> do
        c <- oneToOneChannel
        c' <- oneToOneChannel
        d <- oneToOneChannel
        d' <- oneToOneChannel
        runParallel_
          [ forever $ do x <- readChannel in_
                         x' <- case x of
                                 Left l -> do writeChannel (writer c) l
                                              l' <- readChannel (reader c')
                                              return (Left l')
                                 Right r -> do writeChannel (writer d) r
                                               r' <- readChannel (reader d')
                                               return (Right r')
                         writeChannel out x'
          , p (reader c) (writer c')
          , q (reader d) (writer d')
          ]