-- Communicating Haskell Processes. -- Copyright (c) 2008, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- | A collection of useful common processes that are useful when plumbing -- together a process network. All the processes here rethrow poison when -- it is encountered, as this gives the user maximum flexibility (they can -- let it propagate it, or ignore it). -- -- The names here overlap with standard Prelude names. This is -- deliberate, as the processes act in a similar manner to the -- corresponding Prelude versions. It is expected that you will do -- something like: -- -- > import qualified Control.Concurrent.CHP.Common as Common -- -- or: -- -- > import qualified Control.Concurrent.CHP.Common as CHP -- -- to circumvent this problem. module Control.Concurrent.CHP.Common where import Control.Monad import qualified Data.Traversable as Traversable import Prelude (Bool, Maybe(..), Enum, Ord, ($), (<), Int, otherwise, (.)) import qualified Prelude import Control.Concurrent.CHP -- | Forever forwards the value onwards, unchanged. Adding this to your process -- network effectively adds a single-place buffer. id :: Chanin a -> Chanout a -> CHP () id in_ out = (forever $ do x <- readChannel in_ writeChannel out x ) `onPoisonRethrow` (poison in_ >> poison out) -- | Forever forwards the value onwards, in an extended rendezvous. This is -- like 'id' but does not add any buffering to your network. -- -- extId is a unit of the associative operator '|->|'. extId :: Chanin a -> Chanout a -> CHP () extId in_ out = tap in_ [out] -- | A process that waits for an input, then sends it out on /all/ its output -- channels (in order) during an extended rendezvous. This is often used to send the -- output on to both the normal recipient (without introducing buffering) and -- also to a listener process that wants to examine the value. If the listener -- process is first in the list, and does not take the input immediately, the -- value will not be sent to the other recipients until it does. The name -- of the process derives from the notion of a wire-tap, since the listener -- is hidden from the other processes (it does not visibly change the semantics -- for them). tap :: Chanin a -> [Chanout a] -> CHP () tap in_ outs = (forever $ extReadChannel in_ (\x -> mapM_ (Prelude.flip writeChannel x) outs) ) `onPoisonRethrow` (poison in_ >> poisonAll outs) -- | Sends out a single value first (the prefix) then behaves like id. prefix :: a -> Chanin a -> Chanout a -> CHP () prefix x in_ out = (writeChannel out x >> id in_ out) `onPoisonRethrow` (poison in_ >> poison out) -- | Forever reads in a value, and then sends out its successor (using 'Prelude.succ'). succ :: Enum a => Chanin a -> Chanout a -> CHP () succ = map Prelude.succ -- | Reads in a value, and sends it out in parallel on all the given output -- channels. parDelta :: Chanin a -> [Chanout a] -> CHP () parDelta in_ outs = (forever $ do x <- readChannel in_ runParallel_ $ Prelude.map (Prelude.flip writeChannel x) outs ) `onPoisonRethrow` (poison in_ >> mapM_ poison outs) -- | Forever reads in a value, transforms it using the given function, and sends it -- out again. Note that the transformation is not applied strictly, so don't -- assume that this process will actually perform the computation. map :: (a -> b) -> Chanin a -> Chanout b -> CHP () map f in_ out = forever (readChannel in_ >>= (return . f) >>= writeChannel out) `onPoisonRethrow` (poison in_ >> poison out) -- | Forever reads in a value, and then based on applying the given function -- either discards it (if the function returns false) or sends it on (if -- the function returns True). filter :: (a -> Bool) -> Chanin a -> Chanout a -> CHP () filter f in_ out = forever (do x <- readChannel in_ when (f x) (writeChannel out x) ) `onPoisonRethrow` (poison in_ >> poison out) -- | Streams all items in a Traversable container out in the order given by -- 'Traversable.mapM' on the output channel (one at a time). Lists, Maybe, -- and Set are all instances of Traversable, so this can be used for all of -- those. stream :: Traversable.Traversable t => Chanin (t a) -> Chanout a -> CHP () stream in_ out = (forever $ do xs <- readChannel in_ Traversable.mapM (writeChannel out) xs) `onPoisonRethrow` (poison in_ >> poison out) -- | Forever waits for input from one of its many channels and sends it -- out again on the output channel. merger :: [Chanin a] -> Chanout a -> CHP () merger ins out = (forever $ alt (Prelude.map readChannel ins) >>= writeChannel out) `onPoisonRethrow` (poisonAll ins >> poison out) -- | Sends out the specified value on the given channel the specified number -- of times, then finishes. replicate :: Int -> a -> Chanout a -> CHP () replicate n x c = replicateM_ n (writeChannel c x) `onPoisonRethrow` poison c -- | Forever sends out the same value on the given channel, until poisoned. -- Similar to the white-hole processes in some other frameworks. repeat :: a -> Chanout a -> CHP () repeat x c = (forever $ writeChannel c x) `onPoisonRethrow` poison c -- | Forever reads values from the channel and discards them, until poisoned. -- Similar to the black-hole processes in some other frameworks. consume :: Chanin a -> CHP () consume c = (forever $ readChannel c) `onPoisonRethrow` poison c -- | Forever reads a value from both its input channels in parallel, then joins -- the two values using the given function and sends them out again. For example, -- @join (,) c d@ will pair the values read from @c@ and @d@ and send out the -- pair on the output channel, whereas @join (&&)@ will send out the conjunction -- of two boolean values, @join (==)@ will read two values and output whether -- they are equal or not, etc. join :: (a -> b -> c) -> Chanin a -> Chanin b -> Chanout c -> CHP () join f in0 in1 out = (forever $ do (x,y) <- readChannel in0 <||> readChannel in1 writeChannel out $ f x y ) `onPoisonRethrow` (poison in0 >> poison in1 >> poison out) -- | Forever reads a pair from its input channel, then in parallel sends out -- the first and second parts of the pair on its output channels. -- -- Added in version 1.0.2. split :: Chanin (a, b) -> Chanout a -> Chanout b -> CHP () split in_ outA outB = (forever $ do (a, b) <- readChannel in_ writeChannel outA a <||> writeChannel outB b ) `onPoisonRethrow` (poison in_ >> poison outA >> poison outB) -- | A sorter process. When it receives its first @Just x@ data item, it keeps -- it. When it receieves a second, it keeps the lowest of the two, and sends -- out the other one. When it receives Nothing, it sends out its data value, -- then sends Nothing too. The overall effect when chaining these things together -- is a sorting pump. You inject all the values with Just, then send in a -- single Nothing to get the results out (in reverse order). sorter :: Ord a => Chanin (Maybe a) -> Chanout (Maybe a) -> CHP () sorter = sorter' (<) -- | Like sorter, but with a custom comparison method. You should pass in -- the equivalent of less-than: (<). sorter' :: forall a. (a -> a -> Bool) -> Chanin (Maybe a) -> Chanout (Maybe a) -> CHP () sorter' lt in_ out = internal Nothing `onPoisonRethrow` (poison in_ >> poison out) where internal :: Maybe a -> CHP () internal curVal = do newVal <- readChannel in_ case (curVal, newVal) of -- Flush, but we're empty: (Nothing, Nothing) -> do writeChannel out newVal internal curVal -- Flush: (Just _, Nothing) -> do writeChannel out curVal writeChannel out newVal internal curVal -- New value, we were empty: (Nothing, Just _) -> internal newVal -- New value, we had one already: (Just cur, Just new) | new `lt` cur -> do writeChannel out curVal internal newVal | otherwise -> do writeChannel out newVal internal curVal