-- Communicating Haskell Processes. -- Copyright (c) 2008, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. module Control.Concurrent.CHP.Parallel (runParallel, runParallel_, (<||>), (<|*|>), runParMapM, runParMapM_, ForkingT, forking, fork) where import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Extensible as C import Control.Monad.Reader import Control.Monad.State import Data.List import Data.Maybe import Data.Ord import Control.Concurrent.CHP.Base import Control.Concurrent.CHP.Traces.Base -- | This type-class supports parallel composition of processes. You may use -- the 'runParallel' function to run a list of processes, or the '<||>' operator -- to run just a pair of processes. -- -- In each case, the composition waits for all processes to finish, either -- successfully or with poison. At the end of this, if /any/ process -- exited with poison, the composition will \"rethrow\" this poison. If all -- the processes completed successfully, the results will be returned. If -- you want to ignore poison from the sub-processes, use an empty poison -- handler and 'onPoisonTrap' with each branch. -- | Runs the given list of processes in parallel, and then returns a list -- of the results, in the same order as the processes given. Will only return -- when all the inner processes have completed. -- -- In version 1.5.0, a bug was introduced such that @runParallel []@ would deadlock; -- this was fixed in version 1.5.1. runParallel :: [CHP a] -> CHP [a] runParallel = runParallelPoison -- | A shorthand for applying mapM in parallel; really the composition of 'runParallel' -- and 'map'. -- -- Added in version 1.5.0. runParMapM :: (a -> CHP b) -> [a] -> CHP [b] runParMapM f = runParallel . map f -- | A shorthand for applying mapM_ in parallel; really the composition of 'runParallel_' -- and 'map'. -- -- Added in version 1.5.0. runParMapM_ :: (a -> CHP b) -> [a] -> CHP () runParMapM_ f = runParallel_ . map f -- | A useful operator for performing a two process equivalent of 'runParallel' -- that gives the return values back as a pair rather than a list. This also -- allows the values to have different types (<||>) :: CHP a -> CHP b -> CHP (a, b) (<||>) p q = do [x, y] <- runParallel [liftM Left p, liftM Right q] combine x y where combine :: Monad m => Either a b -> Either a b -> m (a, b) combine (Left x) (Right y) = return (x, y) combine (Right y) (Left x) = return (x, y) -- An extra case to keep the compiler happy: combine _ _ = error "Impossible combination values in <|^|>" -- | An operator similar to '<||>' that discards the output (more like an operator -- version of 'runParallel_'). -- -- Added in version 1.1.0. (<|*|>) :: CHP a -> CHP b -> CHP () (<|*|>) p q = runParallel_ [p >> return (), q >> return ()] -- | Runs all the given processes in parallel and discards any output. Does -- not return until all the processes have completed. 'runParallel_' ps is effectively equivalent -- to 'runParallel' ps >> return (). -- -- In version 1.5.0, a bug was introduced such that @runParallel_ []@ would deadlock; -- this was fixed in version 1.5.1. runParallel_ :: [CHP a] -> CHP () runParallel_ procs = runParallel procs >> return () -- We right associate to allow the liftM fst ((readResult) <||> runParallel_ -- workers) pattern infixr <||> -- Doesn't really matter for this operator: infixr <|*|> -- | Runs all the processes in parallel and returns their results once they -- have all finished. The length and ordering of the results reflects the -- length and ordering of the input runParallelPoison :: forall a. [CHP a] -> CHP [a] runParallelPoison processes = do resultVar <- liftIO $ atomically $ newManyToOneTVar ((== length processes) . length) (return []) [] trace <- PoisonT $ lift $ liftTrace ask blanks <- liftIO $ blankTraces trace (length processes) liftIO $ mapM_ forkIO [do y <- wrapProcess p $ flip runReaderT btr . pullOutStandard C.block $ atomically $ writeManyToOneTVar ((:) (case y of Nothing -> (n, Nothing) Just (Right x) -> (n, Just x) Just (Left _) -> (n, Nothing) )) resultVar >> return () | (p, btr, n) <- zip3 processes blanks [(0::Int)..]] results <- liftIO $ atomically $ readManyToOneTVar resultVar let sortedResults = map snd $ sortBy (comparing fst) results PoisonT $ lift $ liftTrace $ mergeSubProcessTraces blanks mapM (maybe throwPoison return) sortedResults -- | A monad transformer used for introducing forking blocks. newtype (Monad m, MonadCHP m) => ForkingT m a = Forking (ReaderT (TVar (Bool, Int)) m a) deriving (Monad, MonadIO, MonadTrans, MonadCHP) -- TODO in future, get forking working with structural traces -- | Executes a forking block. Processes may be forked off inside (using the -- 'fork' function). When the block completes, it waits for all the forked -- off processes to complete before returning the output, as long as none of -- the processes terminated with uncaught poison. If they did, the poison -- is propagated (rethrown). forking :: MonadCHP m => ForkingT m a -> m a -- Like with parallel, this could probably be made a little more efficient forking (Forking m) = do b <- liftIO $ atomically $ newTVar (False, 0) output <- runReaderT m b p <- liftIO $ atomically $ do (p,n) <- readTVar b if n == 0 then return p else retry if p then liftCHP throwPoison else return output -- | Forks off the given process. The process then runs in parallel with this -- code, until the end of the 'forking' block, when all forked-off processes -- are waited for. At that point, once all of the processes have finished, -- if any of them threw poison it is propagated. fork :: MonadCHP m => CHP () -> ForkingT m () fork p = Forking $ do b <- ask liftIO $ atomically $ do (pa, n) <- readTVar b writeTVar b (pa, n + 1) trace <- liftCHP $ PoisonT $ lift $ liftTrace ask [blank] <- liftIO $ blankTraces trace 1 liftIO $ forkIO $ do r <- wrapProcess p $ flip runReaderT blank . pullOutStandard C.block $ atomically $ do (poisonedAlready, n) <- readTVar b writeTVar b $ (poisonedAlready || isNothing r, n - 1) return ()