conduit-concurrent-map-0.1.1: Concurrent, order-preserving mapping Conduit

Safe HaskellNone




Functions for concurrent mapping over Conduits.


Explicit number of threads

concurrentMapM_ :: (MonadUnliftIO m, MonadResource m) => Int -> Int -> (a -> m b) -> ConduitT a b m () Source #

Concurrent, order-preserving conduit mapping function.

Like mapM, but runs in parallel with the given number of threads, returns outputs in the order of inputs (like mapM, no reordering), and allows defining a bounded size output buffer for elements of type b to maintain high parallelism despite head-of-line blocking.

Because of the no-reordering guarantee, there is head-of-line blocking: When the conduit has to process a long-running computation and a short-running computation in parallel, the result of short one cannot be yielded before the long one is done. Unless we buffer the queued result somewhere, the thread that finished the short-running computation is now blocked and sits idle (low utilisation).

To cope with this, concurrentMapM_ numThreads workerOutputBufferSize f gives each thread workerOutputBufferSize output slots to store bs while they are blocked.

Use the convenience concurrentMapM_ when f is CPU-bound.

workerOutputBufferSize must be given >= 1.

The workerOutputBufferSize keeps the memory usage of the conduit bounded, namely to getNumCapabilities * (workerOutputBufferSize + 1) many bs at any given time (the + 1 is for the currently processing ones).

To achieve maximum parallelism/utilisation, you should choose workerOutputBufferSize ideally as the time factor between the fastest and slowest f that will likely pass through the conduit; for example, if most fs take 3 seconds, but some take 15 seconds, choose workerOutputBufferSize = 5 to avoid an earlier 15-second f blocking a later 3-second f.

The threads inside the conduit will evaluate the results of the f to WHNF, as in !b <- f a, so don't forget to make f itself deepseq the result if there is any lazy data structure involved and you want to make sure that they are evaluated *inside* the conduit (fully in parallel) as opposed to the lazy parts of them being evaluated after being yielded.

As fs happen concurrently, they cannot depend on each other's monadic state. This is enforced by the MonadUnliftIO constraint. This means the function cannot be used with e.g. StateT.


  • Ordering / head of line blocking for outputs: The bs will come out in the same order as their corresponding as came in (the parallelism doesn't change the order).
  • Bounded memory: The conduit will only hold to getNumCapabilities * (workerOutputBufferSize + 1) as many bs.
  • Full utilisation: The conduit will try to keep all cores busy as much as it can. This means that it will always try to await if there's a free core, and will only yield once it has to to make a core free. It also ensures that any worker running for longer than others does not prevent other free workers from starting new work, except from when we're at the workerOutputBufferSize output buffer bound of b elements.
  • Prompt starting: The conduit will start each awaited value immediately, it will not batch up multiple awaits before starting.
  • Async exception safety: When then conduit is killed, the worker threads will be killed too.


puts :: (MonadIO m) => String -> m () -- for non-interleaved output
puts s = liftIO $ BS8.putStrLn (BS8.pack s)
runConduitRes (CL.sourceList [1..6] .| concurrentMapM_ 4 (\i -> liftIO $ puts (show i ++ " before") >> threadDelay (i * 1000000) >> puts (show i ++ " after") >> return (i*2)) .| CL.consume )

CPU-bound use case

concurrentMapM_numCaps :: (MonadUnliftIO m, MonadResource m) => Int -> (a -> m b) -> ConduitT a b m () Source #

concurrentMapM_ with the number of threads set to getNumCapabilities.

Useful when f is CPU-bound.

If f is IO-bound, you probably want to use concurrentMapM_ with explicitly given amount of threads instead.