streamly-0.9.0: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2022 Composewell Technologies
Safe HaskellSafe-Inferred



Eval a fold asynchronously in a separate thread.

Use multiple worker folds to fold a stream and collect the results using another fold, combine using a monoid. The results can be collected out-of-order or in-order.

Concurrent append: if one fold's buffer becomes full then use the next one Concurrent interleave/partition: Round robin to n folds. Concurrent distribute to multiple folds.



Imports for example snippets in this module.

>>> :m
>>> :set -XFlexibleContexts
>>> import Control.Concurrent (threadDelay)
>>> import qualified Streamly.Data.Array as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Internal.Data.Fold.Concurrent as Fold
>>> import Prelude hiding (concatMap, concat)
>>> :{
 delay n = do
     threadDelay (n * 1000000)   -- sleep for n seconds
     putStrLn (show n ++ " sec") -- print "n sec"
     return n                    -- IO Int


data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends.


Stream combinators using Async channel

eval :: MonadAsync m => Fold m a b -> Fold m a b Source #

Evaluate a stream asynchronously using a channel and serve the consumer from the evaluation buffer.

>>> eval = Fold.parEval id

parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #

Evaluate the fold asynchronously in a worker thread separate from the driver thread.