Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Non-parallelizable stream combinators like unfoldrM, iterateM etc. can be
evaluated concurrently with the stream consumer by using eval
.
Parallelizable combinators like repeatM, replicateM can generate the stream
concurrently using concatMap
.
Synopsis
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data Config
- maxThreads :: Int -> Config -> Config
- maxBuffer :: Int -> Config -> Config
- eager :: Bool -> Config -> Config
- data StopWhen
- stopWhen :: StopWhen -> Config -> Config
- ordered :: Bool -> Config -> Config
- interleaved :: Bool -> Config -> Config
- data Rate = Rate {}
- rate :: Maybe Rate -> Config -> Config
- avgRate :: Double -> Config -> Config
- minRate :: Double -> Config -> Config
- maxRate :: Double -> Config -> Config
- constRate :: Double -> Config -> Config
- inspect :: Bool -> Config -> Config
- parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a
- parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a
- parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a
- parMapM :: MonadAsync m => (Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
- parSequence :: MonadAsync m => (Config -> Config) -> Stream m (m a) -> Stream m a
- parTwo :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a -> Stream m a
- parZipWithM :: MonadAsync m => (Config -> Config) -> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- parZipWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- parMergeByM :: MonadAsync m => (Config -> Config) -> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
- parMergeBy :: MonadAsync m => (Config -> Config) -> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
- parListLazy :: MonadAsync m => [Stream m a] -> Stream m a
- parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a
- parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a
- parListEager :: MonadAsync m => [Stream m a] -> Stream m a
- parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a
- parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a
- parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
- parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
- parConcat :: MonadAsync m => (Config -> Config) -> Stream m (Stream m a) -> Stream m a
- parConcatMap :: MonadAsync m => (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
- parConcatIterate :: MonadAsync m => (Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a
- fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
- tapCountD :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
- tapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
Imports
Imports for example snippets in this module.
>>>
:m
>>>
{-# LANGUAGE FlexibleContexts #-}
>>>
import Control.Concurrent (threadDelay)
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Internal.Data.Stream as Stream hiding (append2)
>>>
import qualified Streamly.Internal.Data.Stream.Concurrent as Stream
>>>
import Prelude hiding (concatMap, concat, zipWith)
>>>
:{
delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Types
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Configuration
An abstract type for specifying the configuration parameters of a
Channel
. Use Config -> Config
modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
maxThreads :: Int -> Config -> Config Source #
Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
eager :: Bool -> Config -> Config Source #
By default, processing of output from the worker threads is given priority
over dispatching new workers. More workers are dispatched only when there is
no output to process. With eager
on workers are dispatched aggresively as
long as there is more work to do irrespective of whether there is output
pending to be processed. However, dispatching may stop if maxThreads
or
maxBuffer
is reached.
Note: This option has no effect when rate has been specified.
Note: Not supported with interleaved
.
Specify when the Channel
should stop.
FirstStops | Stop when the first stream ends. |
AllStop | Stop when all the streams end. |
AnyStops | Stop when any one stream ends. |
ordered :: Bool -> Config -> Config Source #
When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.
Note: Not supported with interleaved
.
interleaved :: Bool -> Config -> Config Source #
Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.
Note: Can only be used on finite number of streams.
Note: Not supported with ordered
.
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
rate :: Maybe Rate -> Config -> Config Source #
Specify the stream evaluation rate of a channel.
A Nothing
value means there is no smart rate control, concurrent execution
blocks only if maxThreads
or maxBuffer
is reached, or there are no more
concurrent tasks to execute. This is the default.
When rate (throughput) is specified, concurrent production may be ramped
up or down automatically to achieve the specified stream throughput. The
specific behavior for different styles of Rate
specifications is
documented under Rate
. The effective maximum production rate achieved by
a channel is governed by:
- The
maxThreads
limit - The
maxBuffer
limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Maximum production rate is given by:
\(rate = \frac{maxThreads}{latency}\)
If we know the average latency of the tasks we can set maxThreads
accordingly.
avgRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz
). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
minRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
maxRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
constRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel
when the stream ends.
Combinators
Stream combinators using a concurrent channel
Evaluate
Evaluates a stream concurrently using a channel.
parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a Source #
Evaluate a stream asynchronously. In a serial stream, each element of the
stream is generated as it is demanded by the consumer. parEval
evaluates
multiple elements of the stream ahead of time and serves the results from a
buffer.
Note that the evaluation requires only one thread as only one stream needs to be evaluated. Therefore, the concurrency options that are relevant to multiple streams won't apply here e.g. maxThreads, eager, interleaved, ordered, stopWhen options won't have any effect.
Generate
Uses a single channel to evaluate all actions.
parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a Source #
Definition:
>>>
parRepeatM cfg = Stream.parSequence cfg . Stream.repeat
Generate a stream by repeatedly executing a monadic action forever.
parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a Source #
Generate a stream by concurrently performing a monadic action n
times.
Definition:
>>>
parReplicateM cfg n = Stream.parSequence cfg . Stream.replicate n
Example, parReplicateM
in the following example executes all the
replicated actions concurrently, thus taking only 1 second:
>>>
Stream.fold Fold.drain $ Stream.parReplicateM id 10 $ delay 1
...
Map
Uses a single channel to evaluate all actions.
parMapM :: MonadAsync m => (Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b Source #
Definition:
>>>
parMapM modifier f = Stream.parConcatMap modifier (Stream.fromEffect . f)
Example, the following example finishes in 1 second as all actions run in parallel. Even though results are available out of order they are ordered due to the config option::
>>>
f x = delay x >> return x
>>>
Stream.fold Fold.toList $ Stream.parMapM (Stream.ordered True) f $ Stream.fromList [3,2,1]
1 sec 2 sec 3 sec [3,2,1]
parSequence :: MonadAsync m => (Config -> Config) -> Stream m (m a) -> Stream m a Source #
>>>
parSequence modifier = Stream.parMapM modifier id
Combine two
Use a channel for each pair.
parTwo :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a -> Stream m a Source #
Binary operation to evaluate two streams concurrently using a channel.
If you want to combine more than two streams you almost always want the
parList
or parConcat
operation instead. The performance of this
operation degrades rapidly when more streams are combined as each operation
adds one more concurrent channel. On the other hand, parConcat
uses a
single channel for all streams. However, with this operation you can
precisely control the scheduling by creating arbitrary shape expression
trees.
Definition:
>>>
parTwo cfg x y = Stream.parList cfg [x, y]
Example, the following code finishes in 4 seconds:
>>>
async = Stream.parTwo id
>>>
stream1 = Stream.fromEffect (delay 4)
>>>
stream2 = Stream.fromEffect (delay 2)
>>>
Stream.fold Fold.toList $ stream1 `async` stream2
2 sec 4 sec [2,4]
parZipWithM :: MonadAsync m => (Config -> Config) -> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #
Evaluates the streams being zipped in separate threads than the consumer. The zip function is evaluated in the consumer thread.
>>>
parZipWithM cfg f m1 m2 = Stream.zipWithM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)
Multi-stream concurrency options won't apply here, see the notes in
parEval
.
If you want to evaluate the zip function as well in a separate thread, you
can use a parEval
on parZipWithM
.
parZipWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
>>>
parZipWith cfg f = Stream.parZipWithM cfg (\a b -> return $ f a b)
>>>
m1 = Stream.fromList [1,2,3]
>>>
m2 = Stream.fromList [4,5,6]
>>>
Stream.fold Fold.toList $ Stream.parZipWith id (,) m1 m2
[(1,4),(2,5),(3,6)]
parMergeByM :: MonadAsync m => (Config -> Config) -> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeByM
but evaluates both the streams concurrently.
Definition:
>>>
parMergeByM cfg f m1 m2 = Stream.mergeByM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)
parMergeBy :: MonadAsync m => (Config -> Config) -> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeBy
but evaluates both the streams concurrently.
Definition:
>>>
parMergeBy cfg f = Stream.parMergeByM cfg (\a b -> return $ f a b)
List of streams
Shares a single channel across many streams.
parListLazy :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like concat
but works on a list of streams.
>>>
parListLazy = Stream.parList id
parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListLazy
but with ordered
on.
>>>
parListOrdered = Stream.parList (Stream.ordered True)
parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListLazy
but interleaves the streams fairly instead of prioritizing
the left stream. This schedules all streams in a round robin fashion over
limited number of threads.
>>>
parListInterleaved = Stream.parList (Stream.interleaved True)
parListEager :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListLazy
but with eager
on.
>>>
parListEager = Stream.parList (Stream.eager True)
parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListEager
but stops the output as soon as the first stream stops.
>>>
parListEagerFst = Stream.parList (Stream.eager True . Stream.stopWhen Stream.FirstStops)
parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListEager
but stops the output as soon as any of the two streams
stops.
Definition:
>>>
parListEagerMin = Stream.parList (Stream.eager True . Stream.stopWhen Stream.AnyStops)
parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a Source #
Like parConcat
but works on a list of streams.
>>>
parList modifier = Stream.parConcat modifier . Stream.fromList
Stream of streams
Apply
parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b Source #
Apply an argument stream to a function stream concurrently. Uses a shared channel for all individual applications within a stream application.
Concat
Shares a single channel across many streams.
parConcat :: MonadAsync m => (Config -> Config) -> Stream m (Stream m a) -> Stream m a Source #
Evaluate the streams in the input stream concurrently and combine them.
>>>
parConcat modifier = Stream.parConcatMap modifier id
parConcatMap :: MonadAsync m => (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #
Map each element of the input to a stream and then concurrently evaluate and concatenate the resulting streams. Multiple streams may be evaluated concurrently but earlier streams are perferred. Output from the streams are used as they arrive.
Definition:
>>>
parConcatMap modifier f stream = Stream.parConcat modifier $ fmap f stream
Examples:
>>>
f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap cfg id $ Stream.fromList xs
The following streams finish in 4 seconds:
>>>
stream1 = Stream.fromEffect (delay 4)
>>>
stream2 = Stream.fromEffect (delay 2)
>>>
stream3 = Stream.fromEffect (delay 1)
>>>
f id [stream1, stream2, stream3]
1 sec 2 sec 4 sec [1,2,4]
Limiting threads to 2 schedules the third stream only after one of the first two has finished, releasing a thread:
>>>
f (Stream.maxThreads 2) [stream1, stream2, stream3]
... [2,1,4]
When used with a Single thread it behaves like serial concatMap:
>>>
f (Stream.maxThreads 1) [stream1, stream2, stream3]
... [4,2,1]
>>>
stream1 = Stream.fromList [1,2,3]
>>>
stream2 = Stream.fromList [4,5,6]
>>>
f (Stream.maxThreads 1) [stream1, stream2]
[1,2,3,4,5,6]
Schedule all streams in a round robin fashion over the available threads:
>>>
f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap (Stream.interleaved True . cfg) id $ Stream.fromList xs
>>>
stream1 = Stream.fromList [1,2,3]
>>>
stream2 = Stream.fromList [4,5,6]
>>>
f (Stream.maxThreads 1) [stream1, stream2]
[1,4,2,5,3,6]
ConcatIterate
parConcatIterate :: MonadAsync m => (Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a Source #
Same as concatIterate
but concurrent.
Pre-release
Reactive
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a Source #
Supplies a stream generating callback to a callback setter function. Each invocation of the callback results in a value being generated in the resulting stream.
Pre-release
tapCountD :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #
tapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #
tapCount predicate fold stream
taps the count of those elements in the
stream that pass the predicate
. The resulting count stream is sent to
another thread which folds it using fold
.
For example, to print the count of elements processed every second:
>>>
rate = Stream.rollingMap2 (flip (-)) . Stream.delayPost 1
>>>
report = Stream.fold (Fold.drainMapM print) . rate
>>>
tap = Stream.tapCount (const True) report
>>>
go = Stream.fold Fold.drain $ tap $ Stream.enumerateFrom 0
Note: This may not work correctly on 32-bit machines because of Int overflow.
Pre-release