!88NoneV]8conduit-concurrent-map6Concurrent, order-preserving conduit mapping function.Like g, but runs in parallel with the given number of threads, returns outputs in the order of inputs (like mapMY, no reordering), and allows defining a bounded size output buffer for elements of type b= to maintain high parallelism despite head-of-line blocking.Because of the no-reordering guarantee, there is head-of-line blocking: When the conduit has to process a long-running computation and a short-running computation in parallel, the result of short one cannot be yielded before the long one is done. Unless we buffer the queued result somewhere, the thread that finished the short-running computation is now blocked and sits idle (low utilisation).To cope with this, 3concurrentMapM_ numThreads workerOutputBufferSize f gives each thread workerOutputBufferSize output slots to store bs while they are blocked.Use the convenience  when f is CPU-bound.workerOutputBufferSize must be given >= 1.The workerOutputBufferSize; keeps the memory usage of the conduit bounded, namely to 1getNumCapabilities * (workerOutputBufferSize + 1) many bs at any given time (the + 1' is for the currently processing ones).?To achieve maximum parallelism/utilisation, you should choose workerOutputBufferSize= ideally as the time factor between the fastest and slowest fB that will likely pass through the conduit; for example, if most f4s take 3 seconds, but some take 15 seconds, choose workerOutputBufferSize = 5 to avoid an earlier 15-second f blocking a later 3-second f.@The threads inside the conduit will evaluate the results of the f to WHNF, as in  !b <- f a, so don't forget to make f itself deepseq the result if there is any lazy data structure involved and you want to make sure that they are evaluated *inside* the conduit (fully in parallel) as opposed to the lazy parts of them being evaluated after being yielded.As fbs happen concurrently, they cannot depend on each other's monadic state. This is enforced by the ? constraint. This means the function cannot be used with e.g. StateT. Properties:2Ordering / head of line blocking for outputs: The b<s will come out in the same order as their corresponding a8s came in (the parallelism doesn't change the order).1Bounded memory: The conduit will only hold to 1getNumCapabilities * (workerOutputBufferSize + 1) as many bs.zFull utilisation: The conduit will try to keep all cores busy as much as it can. This means that it will always try to * if there's a free core, and will only yield once it has to to make a core free. It also ensures that any worker running for longer than others does not prevent other free workers from starting new work, except from when we're at the workerOutputBufferSize output buffer bound of b elements.-Prompt starting: The conduit will start each 7ed value immediately, it will not batch up multiple s before starting.^Async exception safety: When then conduit is killed, the worker threads will be killed too.Example: .puts :: (MonadIO m) => String -> m () -- for non-interleaved output puts s = liftIO $ BS8.putStrLn (BS8.pack s) runConduitRes (CL.sourceList [1..6] .| concurrentMapM_ 4 (\i -> liftIO $ puts (show i ++ " before") >> threadDelay (i * 1000000) >> puts (show i ++ " after") >> return (i*2)) .| CL.consume )conduit-concurrent-map# with the number of threads set to . Useful when f is CPU-bound.If f' is IO-bound, you probably want to use 2 with explicitly given amount of threads instead. 3conduit-concurrent-map-0.1.1-IoNj5TmnXUiJUCTpmzKGiNData.Conduit.ConcurrentMap Data.ConduitmapMconcurrentMapM_concurrentMapM_numCaps,unliftio-core-0.1.2.0-900TPot3SR34dceIcVaslSControl.Monad.IO.Unlift MonadUnliftIO&conduit-1.3.0.3-9UnkpWrARbmDwJ8ojIsIcWData.Conduit.Internal.Conduitawaitbase GHC.Conc.SyncgetNumCapabilities