-- | -- Module : Streamly.Data.Stream.Concurrent -- Copyright : (c) 2022 Composewell Technologies -- License : BSD-3-Clause -- Maintainer : streamly@composewell.com -- Stability : released -- Portability : GHC -- -- This module provides concurrent streaming abstractions. -- -- == Programming Tips -- -- The names in this module do not conflict with other stream modules, -- therefore, you can import it in the same namespace: -- -- >>> import qualified Streamly.Data.Stream.Concurrent as Stream module Streamly.Data.Stream.Concurrent ( -- * Concurrency -- $concurrency MonadAsync -- ** Configuration , Config -- *** Limits , maxThreads , maxBuffer -- *** Rate Control , Rate(..) , rate , avgRate , minRate , maxRate , constRate -- *** Stop behavior , StopWhen (..) , stopWhen -- *** Scheduling behavior , eager , ordered , interleaved -- *** Diagnostics , inspect -- ** Combinators -- | Stream combinators using a concurrent channel. -- *** Evaluate -- | Evaluates a serial stream asynchronously using a concurency channel. , parEval -- *** Generate -- | Uses a concurrency channel to evaluate multiple actions concurrently. , parRepeatM , parReplicateM , fromCallback -- *** Map -- | Uses a concurrency channel to evaluate multiple mapped actions -- concurrently. , parMapM , parSequence -- *** Combine two -- | Use a channel for each pair. , parZipWithM , parZipWith , parMergeByM , parMergeBy -- *** List of streams -- | Shares a single channel across many streams. , parList -- , zipWithM -- , zipWith -- *** Stream of streams -- **** Apply , parApply -- **** Concat -- | Shares a single channel across many streams. , parConcat , parConcatMap -- **** ConcatIterate , parConcatIterate -- *** Observation , tapCount ) where import Streamly.Internal.Data.Stream.Concurrent import Prelude hiding (mapM, sequence, concat, concatMap, zipWith) -- $concurrency -- -- == Concurrency Channels -- -- At a lower level, concurrency is implemented using channels that support -- concurrent evaluation of streams. We create a channel, and add one or more -- streams to it. The channel evaluates multiple streams concurrently and then -- generates a single output stream from the results. How the streams are -- combined depends on the configuration of the channel. -- -- == Concurrency Primitives -- -- There are only a few fundamental abstractions for concurrency, 'parEval', -- 'parConcatMap', and 'parConcatIterate', all concurrency combinators can be -- expressed in terms of these. -- -- 'parEval', evaluates a single stream asynchronously, a worker thread runs -- the stream and buffers the results, and the consumer of the stream runs in -- another thread consuming it from the buffer, thus decoupling the production -- and consumption of the stream. This can be used to run different stages of a -- pipeline concurrently. -- -- 'parConcatMap' is used to evaluate multiple streams concurrently and combine -- the results. A stream generator function is mapped to the input stream and -- all the generated streams are then evaluated concurrently, and the results -- are combined. -- -- 'parConcatIterate' is like 'parConcatMap' but iterates a stream generator -- function recursively over the stream. This can be used to traverse trees or -- graphs. -- -- == Concurrency Configuration -- -- Concurrent combinators take a 'Config' argument which controls the -- concurrent behavior. For example, maximum number of threads to be used -- ('maxThreads') or the maxmimum size of the buffer ('maxBuffer'), or how the -- streams are scheduled with respect to each other ('interleaved'), or how the -- results are consumed ('ordered'). -- -- Configuration is specified as @Config -> Config@ modifier functions that can -- be composed together using function composition. For example, to specify the -- maximum threads we can use @parConcatMap (maxThreads 10)@ if we also want to -- specify the maximum buffer we can compose the two options @parConcatMap -- (maxThreads 10 . maxBuffer 100)@. To use default configuration use 'id' as -- the config modifier e.g. @parConcatMap id@. -- -- See the @Configuration@ section and individual configuration options' -- documentation for the default behavior and default values of configuration -- parameters. -- -- == Scheduling behavior -- -- The most important configuration option is to control whether the output of -- the concurrent execution is consumed in the same order as the corresponding -- actions in the input stream or as soon as they arrive. The default is the -- latter, however, we can enforce the original order by using the 'ordered' -- option. -- -- Another important option controls whether the number of worker threads are -- automatically increased and decreased based on the consumption rate or -- threads are started as aggresively as possible until the 'maxThreads' or -- 'maxBuffer' limits are hit. The default is the former. However, the 'eager' -- option can be enabled to use the latter behavior. When 'eager' is on, even -- if the stream consumer thread blocks it does not make any impact on the -- scheduling of the available tasks. -- -- == Concurrent Combinators -- -- Using the few fundamental concurrency primitives we can implement all the -- usual streaming combinators with concurrent behavior. Combinators like -- 'unfoldrM', 'iterateM' that are inherently serial can be evaluated -- concurrently with respect to the consumer pipeline using 'parEval'. -- Combinators like 'zipWithM', 'mergeByM' can also use 'parEval' on the input -- streams to evaluate them concurrently before combining. -- -- Combinators like 'repeatM', 'replicateM', 'fromListM', 'sequence', 'mapM' in -- which all actions are independent of each other can be made concurrent using -- the 'parConcatMap' operation. -- -- A concurrent 'repeatM' repeats an action using multiple concurrent -- executions of the action. Similarly, a concurrent 'mapM' performs the mapped -- action in independent threads. -- -- Some common concurrent combinators are provided in this module.