-- Communicating Haskell Processes. -- Copyright (c) 2008--2009, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- | A collection of useful functions to use with the library. -- -- The most useful operation is 'pipeline' which you can use to wire up a list -- of processes into a line, and run them. The corresponding '|->|' operator is -- a simple binary version that can be a little more concise. When the pipeline -- has channels going in both directions rather than just one, 'dualPipeline' and/or -- '|<->|' can be used. Several other variants on these functions are also provided, -- including operators to use at the beginning and ends of pipelines. module Control.Concurrent.CHP.Utils where import Control.Monad import Control.Concurrent.CHP -- | Wires given processes up in a forward cycle. That is, the first process -- writes to the second, and receives from the last. It returns the list of -- wired-up processes, which you will almost certainly want to run in parallel. wireCycle :: Channel r w => [r a -> w a -> proc] -> CHP [proc] wireCycle procs = do chan <- newChannel wirePipeline procs (reader chan) (writer chan) -- return [p (reader $ chans !! i) (writer $ chans !! ((i + 1) `mod` n)) | (p, i) <- zip procs [0..]] -- | Like wireCycle, but works with processes that connect with a channel in both -- directions. -- -- This function was added in version 1.4.0. wireDualCycle :: (Channel r w, Channel r' w') => [(r a, w' b) -> (r' b, w a) -> proc] -> CHP [proc] wireDualCycle procs = do c <- newChannel d <- newChannel wireDualPipeline procs (reader c, writer d) (reader d, writer c) -- | Wires the given processes up in a forward pipeline. The first process -- in the list is connected to the given reading channel-end (the first parameter) -- and the writing end of a new channel, A. The second process is wired up -- to the reading end of A, and the writing end of the next new channel, B. -- This proceeds all the way to the end of the list, until the final process -- is wired to the reading end of Z (if you have 27 processes in the list, -- and therefore 26 channels in the middle of them) and the second parameter. -- The list of wired-up processes is returned, which you can then run in parallel. wirePipeline :: forall a r w proc. Channel r w => [r a -> w a -> proc] -> r a -> w a -> CHP [proc] wirePipeline [] _ _ = return [] wirePipeline procs in_ out = do chans <- replicateM (n - 1) newChannel -- return $ map (wire chans) $ zip procs [0..] return $ (\(w, ps) -> head procs in_ w : ps) $ (foldr wireF (out, []) $ zip (tail procs) chans) where n = length procs -- One way of doing it: {- wire :: [OneToOneChannel a] -> (Chanin a -> Chanout a -> CSProcess, Int) -> CSProcess wire cs (p, i) | i == 0 = p in_ (writer $ cs !! 0) | i == n - 1 = p (reader $ cs !! i) out | otherwise = p (reader $ cs !! i) (writer $ cs !! (i + 1)) -} -- A way without indexing (possibly a bit more efficient): wireF :: (r a -> w a -> proc, Chan r w a) -> (w a, [proc]) -> (w a, [proc]) wireF (p, c) (w, ps) = (writer c, p (reader c) w : ps) -- | Like wirePipeline, but works with processes that connect with a channel in both -- directions. -- -- This function was added in version 1.4.0. wireDualPipeline :: forall a b r w r' w' proc. (Channel r w, Channel r' w') => [(r a, w' b) -> (r' b, w a) -> proc] -> (r a, w' b) -> (r' b, w a) -> CHP [proc] wireDualPipeline [] _ _ = return [] wireDualPipeline procs@(first:rest) in_ out = do chans <- replicateM (n - 1) newChannel chans' <- replicateM (n - 1) newChannel return $ (\(w, ps) -> first in_ w : ps) $ (foldr wireF (out, []) $ zip3 rest chans chans') where n = length procs wireF :: ((r a, w' b) -> (r' b, w a) -> proc, Chan r w a, Chan r' w' b) -> ((r' b, w a), [proc]) -> ((r' b, w a), [proc]) wireF (p, c, d) (w, ps) = ((reader d, writer c), p (reader c, writer d) w : ps) -- | A specialised version of 'wirePipeline'. Given a list of processes, composes -- them into an ordered pipeline, that takes the channel-ends for the sticking -- out ends of the pipeline and gives a process that returns a list of their -- results. This is equivalent to 'wirePipeline', with the return value fed -- to 'runParallel'. -- -- Added in version 1.0.2. pipeline :: [Chanin a -> Chanout a -> CHP b] -> Chanin a -> Chanout a -> CHP [b] pipeline procs in_ out = wirePipeline procs in_ out >>= runParallel -- | Like pipeline, but works with processes that connect with a channel in both -- directions. -- -- This function was added in version 1.4.0. dualPipeline :: [(Chanin a, Chanout b) -> (Chanin b, Chanout a) -> CHP c] -> (Chanin a, Chanout b) -> (Chanin b, Chanout a) -> CHP [c] dualPipeline p i o = wireDualPipeline p i o >>= runParallel -- | A specialised version of 'wireCycle'. Given a list of processes, composes -- them into a cycle and runs them all in parallel. This is equivalent to -- 'wireCycle' with the return value fed into 'runParallel'. -- -- Added in version 1.0.2. cycle :: [Chanin a -> Chanout a -> CHP b] -> CHP [b] cycle procs = wireCycle procs >>= runParallel -- | Like cycle, but works with processes that connect with a channel in both -- directions. -- -- This function was added in version 1.4.0. dualCycle :: [(Chanin a, Chanout b) -> (Chanin b, Chanout a) -> CHP c] -> CHP [c] dualCycle p = wireDualCycle p >>= runParallel -- | Process composition. Given two processes, composes them into a pipeline, -- like function composition (but with an opposite ordering). The function -- is associative. Using wirePipeline will be more efficient than @foldl1 -- (|->|)@ for more than two processes. -- -- The type for this process became more specific in version 1.2.0. (|->|) :: (a -> Chanout b -> CHP ()) -> (Chanin b -> c -> CHP ()) -> (a -> c -> CHP ()) (|->|) p q x y = do c <- oneToOneChannel runParallel_ [p x (writer c), q (reader c) y] -- | Like (|->|), but labels the channel and uses show for the traces. -- -- Added in version 1.5.0. (|->|^) :: Show b => (a -> Chanout b -> CHP ()) -> (String, Chanin b -> c -> CHP ()) -> (a -> c -> CHP ()) (|->|^) p (l, q) x y = do c <- oneToOneChannel' $ chanLabel l runParallel_ [p x (writer c), q (reader c) y] -- | Process composition that works with processes that connect with a channel in both -- directions. Like (|->|), but connects a channel in each direction. -- -- This function was added in version 1.4.0. (|<->|) :: (a -> (Chanin b, Chanout c) -> CHP ()) -> ((Chanin c, Chanout b) -> d -> CHP ()) -> (a -> d -> CHP ()) (|<->|) p q x y = do c <- oneToOneChannel d <- oneToOneChannel runParallel_ [p x (reader d, writer c), q (reader c, writer d) y] -- | The reversed version of the other operator. -- -- The type for this process became more specific in version 1.2.0. (|<-|) :: (Chanin b -> c -> CHP ()) -> (a -> Chanout b -> CHP ()) -> (a -> c -> CHP ()) (|<-|) = flip (|->|) -- | A function to use at the start of a pipeline you are chaining together with -- the '|->|' operator. -- Added in version 1.2.0. (->|) :: (Chanout b -> CHP ()) -> (Chanin b -> c -> CHP ()) -> (c -> CHP ()) (->|) p q x = do c <- oneToOneChannel runParallel_ [p (writer c), q (reader c) x] -- | A function to use at the end of a pipeline you are chaining together with -- the '|->|' operator. -- Added in version 1.2.0. (|->) :: (a -> Chanout b -> CHP ()) -> (Chanin b -> CHP ()) -> (a -> CHP ()) (|->) p q x = do c <- oneToOneChannel runParallel_ [p x (writer c), q (reader c)] -- | A function to use at the start of a pipeline you are chaining together with -- the '|<->|' operator. -- Added in version 1.4.0. (|<->) :: (a -> (Chanin b, Chanout c) -> CHP ()) -> ((Chanin c, Chanout b) -> CHP ()) -> (a -> CHP ()) (|<->) p q x = do c <- oneToOneChannel d <- oneToOneChannel runParallel_ [p x (reader d, writer c), q (reader c, writer d)] -- | A function to use at the end of a pipeline you are chaining together with -- the '|<->|' operator. -- Added in version 1.4.0. (<->|) :: ((Chanin b, Chanout c) -> CHP ()) -> ((Chanin c, Chanout b) -> a -> CHP ()) -> (a -> CHP ()) (<->|) p q x = do c <- oneToOneChannel d <- oneToOneChannel runParallel_ [p (reader d, writer c), q (reader c, writer d) x] {- -- Like runParallel, but offers a choice between the leading event of each -- parallel branch such that if any leading event of a parallel branch is -- poisoned, any siblings still waiting for their leading event will also be -- poisoned. Note however that any handlers in the sibling branches will not -- execute, as technically they did not encounter poison. -- -- If all the branches have just one event (e.g. a readChannel), this ensures that -- the parallel composition will not deadlock in the presence of poison. -- -- Added in version 1.5.0. runParallelPoison :: [CHP a] -> CHP [a] runParallelPoison ps = do b <- newBarrierWithLabel "runParallelPoison" -- The barrier can never sync properly, but it can be poisoned: enroll b $ const $ enrollList (replicate (length ps) b) $ \ebs -> runParallel $ zipWith useBar ebs ps where useBar :: EnrolledBarrier -> CHP a -> CHP a useBar b p = (p <-> (syncBarrier b >> throwPoison)) `onPoisonRethrow` (poison b) -- Like runParallel_, but offers a choice between the leading event of each -- parallel branch such that if any leading event of a parallel branch is -- poisoned, any siblings still waiting for their leading event will also be -- poisoned. Note however that any handlers in the sibling branches will not -- execute, as technically they did not encounter poison. -- -- If all the branches have just one event (e.g. a readChannel), this ensures that -- the parallel composition will not deadlock in the presence of poison. -- -- Added in version 1.5.0. runParallelPoison_ :: [CHP a] -> CHP () runParallelPoison_ ps = do b <- newBarrierWithLabel "runParallelPoison" -- The barrier can never sync properly, but it can be poisoned: enroll b $ const $ enrollList (replicate (length ps) b) $ \ebs -> runParallel_ $ zipWith useBar ebs ps where useBar :: EnrolledBarrier -> CHP a -> CHP a useBar b p = (p <-> (syncBarrier b >> throwPoison)) `onPoisonRethrow` (poison b) -}