-- Communicating Haskell Processes. -- Copyright (c) 2008, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. module Control.Concurrent.CHP.Parallel (runParallel, runParallel_, (<||>), (<|*|>), ForkingT, forking, fork) where import Control.Concurrent import Control.Concurrent.STM #if __GLASGOW_HASKELL__ >= 609 -- I can't figure out the new Exception system in GHC 6.10 and how I catch all -- exceptions (see GHC bug #2655), so I'm just going to use the old system: import qualified Control.OldException as C #else import qualified Control.Exception as C #endif import Control.Monad.Error import Control.Monad.Reader import Control.Monad.State import Data.List import Data.Maybe import Data.Ord import System.IO import Control.Concurrent.CHP.Base import Control.Concurrent.CHP.Traces.Base -- | This type-class supports parallel composition of processes. You may use -- the 'runParallel' function to run a list of processes, or the '<||>' operator -- to run just a pair of processes. -- -- In each case, the composition waits for all processes to finish, either -- successfully or with poison. At the end of this, if /any/ process -- exited with poison, the composition will \"rethrow\" this poison. If all -- the processes completed successfully, the results will be returned. If -- you want to ignore poison from the sub-processes, use an empty poison -- handler and 'onPoisonTrap' with each branch. -- | Runs the given list of processes in parallel, and then returns a list -- of the results, in the same order as the processes given. Will only return -- when all the inner processes have completed. runParallel :: [CHP a] -> CHP [a] runParallel = runParallelPoison -- | A useful operator for performing a two process equivalent of 'runParallel' -- that gives the return values back as a pair rather than a list. This also -- allows the values to have different types (<||>) :: CHP a -> CHP b -> CHP (a, b) (<||>) p q = do [x, y] <- runParallel [liftM Left p, liftM Right q] combine x y where combine :: Monad m => Either a b -> Either a b -> m (a, b) combine (Left x) (Right y) = return (x, y) combine (Right y) (Left x) = return (x, y) -- An extra case to keep the compiler happy: combine _ _ = error "Impossible combination values in <|^|>" -- | An operator similar to '<||>' that discards the output (more like an operator -- version of 'runParallel_'). -- -- Added in version 1.1.0. (<|*|>) :: CHP a -> CHP b -> CHP () (<|*|>) p q = runParallel_ [p >> return (), q >> return ()] -- | Runs all the given processes in parallel and discards any output. Does -- not return until all the processes have completed. 'runParallel_' ps is effectively equivalent -- to 'runParallel' ps >> return (). runParallel_ :: [CHP a] -> CHP () runParallel_ procs = runParallel procs >> return () -- We right associate to allow the liftM fst ((readResult) <||> runParallel_ -- workers) pattern infixr <||> -- Doesn't really matter for this operator: infixr <|*|> wrapProcess :: CHP a -> (CHP' (Either PoisonError a) -> IO (Either PoisonError a, st)) -> IO (Maybe (Either st (a, st))) wrapProcess (PoisonT proc) unwrapInner = do let inner = runErrorT proc x <- liftM Just (unwrapInner inner) `C.catch` (\x -> liftIO (hPutStrLn stderr $ "Thread terminated with: " ++ show x) >> return Nothing) case x of Nothing -> return Nothing Just (Left _, st) -> return $ Just $ Left st Just (Right y, st) -> return $ Just $ Right (y, st) -- | Runs all the processes in parallel and returns their results once they -- have all finished. The length and ordering of the results reflects the -- length and ordering of the input runParallelPoison :: forall a. [CHP a] -> CHP [a] runParallelPoison processes = do c <- liftIO $ atomically $ newResultsVar trace <- PoisonT $ lift $ liftTrace get blanks <- liftIO $ blankTraces trace (length processes) liftIO $ mapM_ forkIO [do y <- wrapProcess p $ flip runStateT btr . pullOutStandard C.block $ atomically $ do ys <- readTVar c writeTVar c $ (case y of Nothing -> (n, (Nothing, Nothing)) Just (Right (x,t)) -> (n, (Just x, Just t)) Just (Left t) -> (n, (Nothing, Just t)) ) : ys | (p, btr, n) <- zip3 processes blanks [0..]] results <- liftIO $ atomically $ do xs <- readTVar c if length xs == length processes then return xs else retry let sortedResults = map snd $ sortBy (comparing fst) results PoisonT $ lift $ liftTrace $ mergeSubProcessTraces (mapMaybe snd sortedResults) mapM (maybe throwPoison return . fst) sortedResults where newResultsVar :: STM (TVar [(Integer, (Maybe a, Maybe TraceStore))]) newResultsVar = newTVar [] -- TODO could make the parent only wake up once, with a bit of twiddling -- | A monad transformer used for introducing forking blocks. newtype (Monad m, MonadCHP m) => ForkingT m a = Forking (ReaderT (TVar (Bool, Int)) m a) deriving (Monad, MonadIO, MonadTrans, MonadCHP) -- TODO in future, get forking working with structural traces -- | Executes a forking block. Processes may be forked off inside (using the -- 'fork' function). When the block completes, it waits for all the forked -- off processes to complete before returning the output, as long as none of -- the processes terminated with uncaught poison. If they did, the poison -- is propagated (rethrown). forking :: MonadCHP m => ForkingT m a -> m a -- Like with parallel, this could probably be made a little more efficient forking (Forking m) = do b <- liftIO $ atomically $ newTVar (False, 0) output <- runReaderT m b p <- liftIO $ atomically $ do (p,n) <- readTVar b if n == 0 then return p else retry if p then liftCHP throwPoison else return output -- | Forks off the given process. The process then runs in parallel with this -- code, until the end of the 'forking' block, when all forked-off processes -- are waited for. At that point, once all of the processes have finished, -- if any of them threw poison it is propagated. fork :: MonadCHP m => CHP () -> ForkingT m () fork p = Forking $ do b <- ask liftIO $ atomically $ do (pa, n) <- readTVar b writeTVar b (pa, n + 1) trace <- liftCHP $ PoisonT $ lift $ liftTrace get [blank] <- liftIO $ blankTraces trace 1 liftIO $ forkIO $ do r <- wrapProcess p $ flip runStateT blank . pullOutStandard C.block $ atomically $ do (poisonedAlready, n) <- readTVar b writeTVar b $ (poisonedAlready || isNothing r, n - 1) return ()