-- Communicating Haskell Processes.
-- Copyright (c) 2008, University of Kent.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * Neither the name of the University of Kent nor the names of its
--    contributors may be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-- | A collection of useful common processes that are useful when plumbing
-- together a process network.  All the processes here rethrow poison when
-- it is encountered, as this gives the user maximum flexibility (they can
-- let it propagate it, or ignore it).
--
-- The names here overlap with standard Prelude names.  This is
-- deliberate, as the processes act in a similar manner to the
-- corresponding Prelude versions.  It is expected that you will do
-- something like:
--
-- > import qualified Control.Concurrent.CHP.Common as Common
--
-- or:
--
-- > import qualified Control.Concurrent.CHP.Common as CHP
--
-- to circumvent this problem.
module Control.Concurrent.CHP.Common where

import Control.Monad
import qualified Data.Traversable as Traversable
import Prelude (Bool, Maybe(..), Enum, Ord, ($), (<), Int, otherwise, (.))
import qualified Prelude

import Control.Concurrent.CHP

-- | Forever forwards the value onwards, unchanged.  Adding this to your process
-- network effectively adds a single-place buffer.
id :: Chanin a -> Chanout a -> CHP ()
id in_ out = (forever $
  do x <- readChannel in_
     writeChannel out x
  ) `onPoisonRethrow` (poison in_ >> poison out)

-- | Forever forwards the value onwards, in an extended rendezvous.  This is
-- like 'id' but does not add any buffering to your network.
--
-- extId is a unit of the associative operator '|->|'.
extId :: Chanin a -> Chanout a -> CHP ()
extId in_ out = tap in_ [out]

-- | A process that waits for an input, then sends it out on /all/ its output
-- channels (in order) during an extended rendezvous.  This is often used to send the
-- output on to both the normal recipient (without introducing buffering) and
-- also to a listener process that wants to examine the value.  If the listener
-- process is first in the list, and does not take the input immediately, the
-- value will not be sent to the other recipients until it does.  The name
-- of the process derives from the notion of a wire-tap, since the listener
-- is hidden from the other processes (it does not visibly change the semantics
-- for them).
tap :: Chanin a -> [Chanout a] -> CHP ()
tap in_ outs = (forever $
  extReadChannel in_
     (\x -> mapM_ (Prelude.flip writeChannel x) outs)
  ) `onPoisonRethrow` (poison in_ >> poisonAll outs)

-- | Sends out a single value first (the prefix) then behaves like id.
prefix :: a -> Chanin a -> Chanout a -> CHP ()
prefix x in_ out = (writeChannel out x >> id in_ out)
  `onPoisonRethrow` (poison in_ >> poison out)

-- | Forever reads in a value, and then sends out its successor (using 'Prelude.succ').
succ :: Enum a => Chanin a -> Chanout a -> CHP ()
succ = map Prelude.succ

-- | Reads in a value, and sends it out in parallel on all the given output
-- channels.
parDelta :: Chanin a -> [Chanout a] -> CHP ()
parDelta in_ outs = (forever $
  do x <- readChannel in_
     runParallel_ $ Prelude.map (Prelude.flip writeChannel x) outs
  ) `onPoisonRethrow` (poison in_ >> mapM_ poison outs)

-- | Forever reads in a value, transforms it using the given function, and sends it
-- out again.  Note that the transformation is not applied strictly, so don't
-- assume that this process will actually perform the computation.
map :: (a -> b) -> Chanin a -> Chanout b -> CHP ()
map f in_ out = forever (readChannel in_ >>= (return . f) >>= writeChannel out)
  `onPoisonRethrow` (poison in_ >> poison out)

-- | Forever reads in a value, and then based on applying the given function
-- either discards it (if the function returns false) or sends it on (if
-- the function returns True).
filter :: (a -> Bool) -> Chanin a -> Chanout a -> CHP ()
filter f in_ out = forever (do
  x <- readChannel in_
  when (f x) (writeChannel out x)
  ) `onPoisonRethrow` (poison in_ >> poison out)

-- | Streams all items in a Traversable container out in the order given by
-- 'Traversable.mapM' on the output channel (one at a time).  Lists, Maybe,
-- and Set are all instances of Traversable, so this can be used for all of
-- those.
stream :: Traversable.Traversable t => Chanin (t a) -> Chanout a -> CHP ()
stream in_ out = (forever $ do
  xs <- readChannel in_
  Traversable.mapM (writeChannel out) xs)
  `onPoisonRethrow` (poison in_ >> poison out)

-- | Forever waits for input from one of its many channels and sends it
-- out again on the output channel.
merger :: [Chanin a] -> Chanout a -> CHP ()
merger ins out = (forever $ alt (Prelude.map readChannel ins) >>= writeChannel out)
  `onPoisonRethrow` (poisonAll ins >> poison out)

-- | Sends out the specified value on the given channel the specified number
-- of times, then finishes.
replicate :: Int -> a -> Chanout a -> CHP ()
replicate n x c = replicateM_ n (writeChannel c x) `onPoisonRethrow` poison c

-- | Forever sends out the same value on the given channel, until poisoned.
--  Similar to the white-hole processes in some other frameworks.
repeat :: a -> Chanout a -> CHP ()
repeat x c = (forever $ writeChannel c x) `onPoisonRethrow` poison c

-- | Forever reads values from the channel and discards them, until poisoned.
--  Similar to the black-hole processes in some other frameworks.
consume :: Chanin a -> CHP ()
consume c = (forever $ readChannel c) `onPoisonRethrow` poison c

-- | Forever reads a value from both its input channels in parallel, then joins
-- the two values using the given function and sends them out again.  For example,
-- @join (,) c d@ will pair the values read from @c@ and @d@ and send out the
-- pair on the output channel, whereas @join (&&)@ will send out the conjunction
-- of two boolean values, @join (==)@ will read two values and output whether
-- they are equal or not, etc.
join :: (a -> b -> c) -> Chanin a -> Chanin b -> Chanout c -> CHP ()
join f in0 in1 out = (forever $ do
  (x,y) <- readChannel in0 <||> readChannel in1
  writeChannel out $ f x y
  ) `onPoisonRethrow` (poison in0 >> poison in1 >> poison out)

-- | Forever reads a pair from its input channel, then in parallel sends out
-- the first and second parts of the pair on its output channels.
--
-- Added in version 1.0.2.
split :: Chanin (a, b) -> Chanout a -> Chanout b -> CHP ()
split in_ outA outB = (forever $ do
  (a, b) <- readChannel in_
  writeChannel outA a <||> writeChannel outB b
  ) `onPoisonRethrow` (poison in_ >> poison outA >> poison outB)

-- | A sorter process.  When it receives its first @Just x@ data item, it keeps
-- it.  When it receieves a second, it keeps the lowest of the two, and sends
-- out the other one.  When it receives Nothing, it sends out its data value,
-- then sends Nothing too.  The overall effect when chaining these things together
-- is a sorting pump.  You inject all the values with Just, then send in a
-- single Nothing to get the results out (in reverse order).
sorter :: Ord a => Chanin (Maybe a) -> Chanout (Maybe a) -> CHP ()
sorter = sorter' (<)

-- | Like sorter, but with a custom comparison method.  You should pass in
-- the equivalent of less-than: (<).
sorter' :: forall a. (a -> a -> Bool) -> Chanin (Maybe a) -> Chanout (Maybe a) -> CHP ()
sorter' lt in_ out = internal Nothing `onPoisonRethrow` (poison in_ >> poison out)
  where
    internal :: Maybe a -> CHP ()
    internal curVal = do newVal <- readChannel in_
                         case (curVal, newVal) of
                           -- Flush, but we're empty:
                           (Nothing, Nothing) -> do writeChannel out newVal
                                                    internal curVal
                           -- Flush:
                           (Just _, Nothing) -> do writeChannel out curVal
                                                   writeChannel out newVal
                                                   internal curVal
                           -- New value, we were empty:
                           (Nothing, Just _) -> internal newVal
                           -- New value, we had one already:
                           (Just cur, Just new)
                             | new `lt` cur -> do writeChannel out curVal
                                                  internal newVal
                             | otherwise -> do writeChannel out newVal
                                               internal curVal