{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns, ExistentialQuantification #-} {-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fwarn-unused-imports #-} -- | This module provides a monad @Par@, for speeding up pure -- computations using parallel processors. It cannot be used for -- speeding up computations that use IO (for that, see -- @Control.Concurrent@). The result of a given @Par@ computation is -- always the same - ie. it is deterministic, but the computation may -- be performed more quickly if there are processors available to -- share the work. -- -- For example, the following program fragment computes the values of -- @(f x)@ and @(g x)@ in parallel, and returns a pair of their results: -- -- > runPar $ do -- > fx <- pval (f x) -- start evaluating (f x) -- > gx <- pval (g x) -- start evaluating (g x) -- > a <- get fx -- wait for fx -- > b <- get gx -- wait for gx -- > return (a,b) -- return results -- -- @Par@ can be used for specifying pure parallel computations in -- which the order of the computation is not known beforehand. -- The programmer specifies how information flows from one -- part of the computation to another, but not the order in which -- computations will be evaluated at runtime. Information flow is -- described using "variables" called @IVar@s, which support 'put' and -- 'get' operations. For example, suppose you have a problem that -- can be expressed as a network with four nodes, where @b@ and @c@ -- require the value of @a@, and @d@ requires the value of @b@ and @c@: -- -- > a -- > / \ -- > b c -- > \ / -- > d -- -- Then you could express this in the @Par@ monad like this: -- -- > runPar $ do -- > [a,b,c,d] <- sequence [new,new,new,new] -- > fork $ do x <- get a; put b (x+1) -- > fork $ do x <- get a; put c (x+2) -- > fork $ do x <- get b; y <- get c; put d (x+y) -- > fork $ do put a (3 :: Int) -- > get d -- -- The result of the above computation is always 9. The 'get' operation -- waits until its input is available; multiple 'put's to the same -- @IVar@ are not allowed, and result in a runtime error. Values -- stored in @IVar@s are usually fully evaluated (although there are -- ways provided to pass lazy values if necessary). -- -- In the above example, @b@ and @c@ will be evaluated in parallel. -- In practice the work involved at each node is too small here to see -- the benefits of parallelism though: typically each node should -- involve much more work. The granularity is completely under your -- control - too small and the overhead of the @Par@ monad will -- outweigh any parallelism benefits, whereas if the nodes are too -- large then there might not be enough parallelism to use all the -- available processors. -- -- Unlike @Control.Parallel@, in @Control.Monad.Par@ parallelism is -- not combined with laziness, so sharing and granulairty are -- completely under the control of the programmer. New units of -- parallel work are only created by @fork@, @par@, and a few other -- combinators. -- -- The implementation is based on a work-stealing scheduler that -- divides the work as evenly as possible betwen the available -- processors at runtime. -- module Control.Monad.Par ( -- * The @Par@ monad Par, runPar, fork, -- * Communication: @IVar@s IVar, new, newFull, newFull_, get, put, put_, -- * Operations pval, spawn, spawn_, parMap, parMapM, parMapReduceRangeThresh, parMapReduceRange, InclusiveRange(..), parFor, ) where import Control.Monad.Par.Internal import Control.DeepSeq import Data.Traversable import Control.Monad as M hiding (mapM, sequence, join) import Prelude hiding (mapM, sequence, head,tail) import GHC.Conc (numCapabilities) -- ----------------------------------------------------------------------------- -- | forks a computation to happen in parallel. The forked -- computation may exchange values with other computations using -- @IVar@s. fork :: Par () -> Par () fork p = Par $ \c -> Fork (runCont p (\_ -> Done)) (c ()) -- > both a b >> c == both (a >> c) (b >> c) -- is this useful for anything? -- both :: Par a -> Par a -> Par a -- both a b = Par $ \c -> Fork (runCont a c) (runCont b c) -- ----------------------------------------------------------------------------- -- Derived functions -- | Like 'spawn', but the result is only head-strict, not fully-strict. spawn_ :: Par a -> Par (IVar a) spawn_ p = do r <- new fork (p >>= put_ r) return r -- | Like 'fork', but returns a @IVar@ that can be used to query the -- result of the forked computataion. -- -- > spawn p = do -- > r <- new -- > fork (p >>= put r) -- > return r -- spawn :: NFData a => Par a -> Par (IVar a) spawn p = do r <- new fork (p >>= put r) return r -- | equivalent to @spawn . return@ pval :: NFData a => a -> Par (IVar a) pval a = spawn (return a) -- ----------------------------------------------------------------------------- -- Parallel maps over Traversable data structures -- | Applies the given function to each element of a data structure -- in parallel (fully evaluating the results), and returns a new data -- structure containing the results. -- -- > parMap f xs = mapM (pval . f) xs >>= mapM get -- -- @parMap@ is commonly used for lists, where it has this specialised type: -- -- > parMap :: NFData b => (a -> b) -> [a] -> Par [b] -- parMap :: (Traversable t, NFData b) => (a -> b) -> t a -> Par (t b) parMap f xs = mapM (pval . f) xs >>= mapM get -- | Like 'parMap', but the function is a @Par@ monad operation. -- -- > parMapM f xs = mapM (spawn . f) xs >>= mapM get -- parMapM :: (Traversable t, NFData b) => (a -> Par b) -> t a -> Par (t b) parMapM f xs = mapM (spawn . f) xs >>= mapM get {-# SPECIALISE parMap :: (NFData b) => (a -> b) -> [a] -> Par [b] #-} {-# SPECIALISE parMapM :: (NFData b) => (a -> Par b) -> [a] -> Par [b] #-} -- TODO: Perhaps should introduce a class for the "splittable range" concept. data InclusiveRange = InclusiveRange Int Int -- | Computes a binary map\/reduce over a finite range. The range is -- recursively split into two, the result for each half is computed in -- parallel, and then the two results are combined. When the range -- reaches the threshold size, the remaining elements of the range are -- computed sequentially. -- -- For example, the following is a parallel implementation of -- -- > foldl (+) 0 (map (^2) [1..10^6]) -- -- > parMapReduceRangeThresh 100 (InclusiveRange 1 (10^6)) -- > (\x -> return (x^2)) -- > (\x y -> return (x+y)) -- > 0 -- parMapReduceRangeThresh :: NFData a => Int -- ^ threshold -> InclusiveRange -- ^ range over which to calculate -> (Int -> Par a) -- ^ compute one result -> (a -> a -> Par a) -- ^ combine two results (associative) -> a -- ^ initial result -> Par a parMapReduceRangeThresh threshold (InclusiveRange min max) fn binop init = loop min max where loop min max | max - min <= threshold = let mapred a b = do x <- fn b; result <- a `binop` x return result in foldM mapred init [min..max] | otherwise = do let mid = min + ((max - min) `quot` 2) rght <- spawn $ loop (mid+1) max l <- loop min mid r <- get rght l `binop` r -- How many tasks per process should we aim for. Higher numbers -- improve load balance but put more pressure on the scheduler. auto_partition_factor :: Int auto_partition_factor = 4 -- | \"Auto-partitioning\" version of 'parMapReduceRangeThresh' that chooses the threshold based on -- the size of the range and the number of processors.. parMapReduceRange :: NFData a => InclusiveRange -> (Int -> Par a) -> (a -> a -> Par a) -> a -> Par a parMapReduceRange (InclusiveRange start end) fn binop init = loop (length segs) segs where segs = splitInclusiveRange (auto_partition_factor * numCapabilities) (start,end) loop 1 [(st,en)] = let mapred a b = do x <- fn b; result <- a `binop` x return result in foldM mapred init [st..en] loop n segs = let half = n `quot` 2 (left,right) = splitAt half segs in do l <- spawn$ loop half left r <- loop (n-half) right l' <- get l l' `binop` r -- TODO: A version that works for any splittable input domain. In this case -- the "threshold" is a predicate on inputs. -- parMapReduceRangeGeneric :: (inp -> Bool) -> (inp -> Maybe (inp,inp)) -> inp -> -- Experimental: -- | Parallel for-loop over an inclusive range. Semantically equivalent -- to -- -- > parFor (InclusiveRange n m) f = forM_ [n..m] f -- -- except that the implementation will split the work into an -- unspecified number of subtasks in an attempt to gain parallelism. -- The exact number of subtasks is chosen at runtime, and is probably -- a small multiple of the available number of processors. -- -- Strictly speaking the semantics of 'parFor' depends on the -- number of processors, and its behaviour is therefore not -- deterministic. However, a good rule of thumb is to not have any -- interdependencies between the elements; if this rule is followed -- then @parFor@ has deterministic semantics. One easy way to follow -- this rule is to only use 'put' or 'put_' in @f@, never 'get'. parFor :: InclusiveRange -> (Int -> Par ()) -> Par () parFor (InclusiveRange start end) body = do let run (x,y) = for_ x (y+1) body range_segments = splitInclusiveRange (4*numCapabilities) (start,end) vars <- M.forM range_segments (\ pr -> spawn_ (run pr)) M.mapM_ get vars return () splitInclusiveRange :: Int -> (Int, Int) -> [(Int, Int)] splitInclusiveRange pieces (start,end) = map largepiece [0..remain-1] ++ map smallpiece [remain..pieces-1] where len = end - start + 1 -- inclusive [start,end] (portion, remain) = len `quotRem` pieces largepiece i = let offset = start + (i * (portion + 1)) in (offset, offset + portion) smallpiece i = let offset = start + (i * portion) + remain in (offset, offset + portion - 1) -- My own forM for numeric ranges (not requiring deforestation optimizations). -- Inclusive start, exclusive end. {-# INLINE for_ #-} for_ :: Monad m => Int -> Int -> (Int -> m ()) -> m () for_ start end _fn | start > end = error "for_: start is greater than end" for_ start end fn = loop start where loop !i | i == end = return () | otherwise = do fn i; loop (i+1)