streamly-0.9.0: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.Concurrent.Channel

Description

 
Synopsis

Channel

data Channel m a Source #

A mutable channel to evaluate multiple streams concurrently and provide the combined results as output stream.

newChannel :: MonadAsync m => (Config -> Config) -> m (Channel m a) Source #

Create a new concurrent stream evaluation channel. The monad state used to run the stream actions is captured from the call site of newChannel.

withChannel :: MonadAsync m => (Config -> Config) -> Stream m a -> (Channel m b -> Stream m a -> Stream m b) -> Stream m b Source #

Allocate a channel and evaluate the stream using the channel and the supplied evaluator function. The evaluator is run in a worker thread.

withChannelK :: MonadAsync m => (Config -> Config) -> StreamK m a -> (Channel m b -> StreamK m a -> StreamK m b) -> StreamK m b Source #

Allocate a channel and evaluate the stream using the channel and the supplied evaluator function. The evaluator is run in a worker thread.

fromChannel :: MonadAsync m => Channel m a -> Stream m a Source #

Generate a stream of results from concurrent evaluations from a channel. Evaluation of the channel does not start until this API is called. This API must not be called more than once on a channel. It kicks off evaluation of the channel by dispatching concurrent workers and ensures that as long there is work queued on the channel workers are dispatched proportional to the demand by the consumer.

toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m () Source #

Send a stream to a given channel for concurrent evaluation.

toChannelK :: MonadRunInIO m => Channel m a -> StreamK m a -> m () Source #

Write a stream to an SVar in a non-blocking manner. The stream can then be read back from the SVar using fromSVar.

stopChannel :: MonadIO m => Channel m a -> m () Source #

Stop the channel. Kill all running worker threads.

Configuration

data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

defaultConfig :: Config Source #

The fields prefixed by an _ are not to be accessed or updated directly but via smart accessor APIs. Use get/set routines instead of directly accessing the Config fields

Limits

maxThreads :: Int -> Config -> Config Source #

Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Rate Control

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Constructors

Rate 

Fields

rate :: Maybe Rate -> Config -> Config Source #

Specify the stream evaluation rate of a channel.

A Nothing value means there is no smart rate control, concurrent execution blocks only if maxThreads or maxBuffer is reached, or there are no more concurrent tasks to execute. This is the default.

When rate (throughput) is specified, concurrent production may be ramped up or down automatically to achieve the specified stream throughput. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a channel is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Maximum production rate is given by:

\(rate = \frac{maxThreads}{latency}\)

If we know the average latency of the tasks we can set maxThreads accordingly.

avgRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

minRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

maxRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

constRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Stop behavior

data StopWhen Source #

Specify when the Channel should stop.

Constructors

FirstStops

Stop when the first stream ends.

AllStop

Stop when all the streams end.

AnyStops

Stop when any one stream ends.

stopWhen :: StopWhen -> Config -> Config Source #

Specify when the Channel should stop.

Scheduling behavior

eager :: Bool -> Config -> Config Source #

By default, processing of output from the worker threads is given priority over dispatching new workers. More workers are dispatched only when there is no output to process. With eager on workers are dispatched aggresively as long as there is more work to do irrespective of whether there is output pending to be processed. However, dispatching may stop if maxThreads or maxBuffer is reached.

Note: This option has no effect when rate has been specified.

Note: Not supported with interleaved.

ordered :: Bool -> Config -> Config Source #

When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.

Note: Not supported with interleaved.

interleaved :: Bool -> Config -> Config Source #

Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.

Note: Can only be used on finite number of streams.

Note: Not supported with ordered.

Diagnostics

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends.