Safe Haskell | None |
---|---|
Language | Haskell98 |
Functions for concurrent mapping over Conduits.
Synopsis
- concurrentMapM_ :: (MonadUnliftIO m, MonadResource m) => Int -> Int -> (a -> m b) -> ConduitT a b m ()
- concurrentMapM_numCaps :: (MonadUnliftIO m, MonadResource m) => Int -> (a -> m b) -> ConduitT a b m ()
Explicit number of threads
concurrentMapM_ :: (MonadUnliftIO m, MonadResource m) => Int -> Int -> (a -> m b) -> ConduitT a b m () Source #
Concurrent, order-preserving conduit mapping function.
Like mapM
, but runs in parallel with the given number of threads,
returns outputs in the order of inputs (like mapM
, no reordering),
and allows defining a bounded size output buffer for elements of type b
to
maintain high parallelism despite head-of-line blocking.
Because of the no-reordering guarantee, there is head-of-line blocking: When the conduit has to process a long-running computation and a short-running computation in parallel, the result of short one cannot be yielded before the long one is done. Unless we buffer the queued result somewhere, the thread that finished the short-running computation is now blocked and sits idle (low utilisation).
To cope with this, concurrentMapM_ numThreads workerOutputBufferSize f
gives each
thread workerOutputBufferSize
output slots to store b
s while they are blocked.
Use the convenience concurrentMapM_
when f
is CPU-bound.
workerOutputBufferSize
must be given >= 1.
The workerOutputBufferSize
keeps the memory usage of the conduit bounded,
namely to getNumCapabilities * (workerOutputBufferSize + 1)
many b
s at any
given time (the + 1
is for the currently processing ones).
To achieve maximum parallelism/utilisation, you should choose
workerOutputBufferSize
ideally as the time factor between the fastest
and slowest f
that will likely pass through the conduit; for example,
if most f
s take 3 seconds, but some take 15 seconds, choose
workerOutputBufferSize = 5
to avoid an earlier 15-second f
blocking
a later 3-second f
.
The threads inside the conduit will evaluate the results of the f
to
WHNF, as in !b <- f a
, so don't forget to make f
itself deepseq
the
result if there is any lazy data structure involved and you want to make
sure that they are evaluated *inside* the conduit (fully in parallel)
as opposed to the lazy parts of them being evaluated after being yielded.
As f
s happen concurrently, they cannot depend on each other's monadic
state. This is enforced by the MonadUnliftIO
constraint.
This means the function cannot be used with e.g. StateT
.
Properties:
- Ordering / head of line blocking for outputs: The
b
s will come out in the same order as their correspondinga
s came in (the parallelism doesn't change the order). - Bounded memory: The conduit will only hold to
getNumCapabilities * (workerOutputBufferSize + 1)
as manyb
s. - Full utilisation: The conduit will try to keep all cores busy as much as
it can. This means that it will always try to
await
if there's a free core, and will onlyyield
once it has to to make a core free. It also ensures that any worker running for longer than others does not prevent other free workers from starting new work, except from when we're at theworkerOutputBufferSize
output buffer bound ofb
elements. - Prompt starting: The conduit will start each
await
ed value immediately, it will not batch up multipleawait
s before starting. - Async exception safety: When then conduit is killed, the worker threads will be killed too.
Example:
puts :: (MonadIO m) => String -> m () -- for non-interleaved output puts s = liftIO $ BS8.putStrLn (BS8.pack s) runConduitRes (CL.sourceList [1..6] .| concurrentMapM_ 4 (\i -> liftIO $ puts (show i ++ " before") >> threadDelay (i * 1000000) >> puts (show i ++ " after") >> return (i*2)) .| CL.consume )
CPU-bound use case
concurrentMapM_numCaps :: (MonadUnliftIO m, MonadResource m) => Int -> (a -> m b) -> ConduitT a b m () Source #
concurrentMapM_
with the number of threads set to getNumCapabilities
.
Useful when f
is CPU-bound.
If f
is IO-bound, you probably want to use concurrentMapM_
with
explicitly given amount of threads instead.