Copyright | (c) 2022 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Eval a fold asynchronously in a separate thread.
Use multiple worker folds to fold a stream and collect the results using another fold, combine using a monoid. The results can be collected out-of-order or in-order.
Concurrent append: if one fold's buffer becomes full then use the next one Concurrent interleave/partition: Round robin to n folds. Concurrent distribute to multiple folds.
Imports
Imports for example snippets in this module.
>>>
:m
>>>
:set -XFlexibleContexts
>>>
import Control.Concurrent (threadDelay)
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Internal.Data.Fold.Concurrent as Fold
>>>
import Prelude hiding (concatMap, concat)
>>>
:{
delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Configuration
An abstract type for specifying the configuration parameters of a
Channel
. Use Config -> Config
modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel
when the stream ends.
Combinators
Stream combinators using Async channel