streamly-0.9.0: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.Channel.Types

Description

A Channel is a place where streams join and new streams start. This module defines low level data structures and functions to build channels. For concrete Channels see the Channel modules of specific stream types.

A Channel is a conduit to the output from multiple streams running concurrently and asynchronously. A channel can be thought of as an asynchronous IO handle. We can write any number of streams to a channel in a non-blocking manner and then read them back at any time at any pace. The channel would run the streams asynchronously and accumulate results. A channel may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever pace it wants to read. The channel monitors and adapts to the consumer's pace.

A channel is a mini scheduler, it has an associated workLoop that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. An outputDoorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the channel by fromChannel on demand if the output produced is not keeping pace with the consumer. On bounded channels, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.

Synopsis

Types

newtype Count Source #

Constructors

Count Int64 

Instances

Instances details
Bounded Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Enum Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Num Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Read Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Integral Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Real Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Methods

toRational :: Count -> Rational #

Show Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Methods

showsPrec :: Int -> Count -> ShowS #

show :: Count -> String #

showList :: [Count] -> ShowS #

Eq Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Methods

(==) :: Count -> Count -> Bool #

(/=) :: Count -> Count -> Bool #

Ord Count Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Methods

compare :: Count -> Count -> Ordering #

(<) :: Count -> Count -> Bool #

(<=) :: Count -> Count -> Bool #

(>) :: Count -> Count -> Bool #

(>=) :: Count -> Count -> Bool #

max :: Count -> Count -> Count #

min :: Count -> Count -> Count #

data Limit Source #

Constructors

Unlimited 
Limited Word 

Instances

Instances details
Show Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Methods

showsPrec :: Int -> Limit -> ShowS #

show :: Limit -> String #

showList :: [Limit] -> ShowS #

Eq Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Methods

(==) :: Limit -> Limit -> Bool #

(/=) :: Limit -> Limit -> Bool #

Ord Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Channel.Types

Methods

compare :: Limit -> Limit -> Ordering #

(<) :: Limit -> Limit -> Bool #

(<=) :: Limit -> Limit -> Bool #

(>) :: Limit -> Limit -> Bool #

(>=) :: Limit -> Limit -> Bool #

max :: Limit -> Limit -> Limit #

min :: Limit -> Limit -> Limit #

data ThreadAbort Source #

Channel driver throws this exception to all active workers to clean up the channel.

Constructors

ThreadAbort 

data ChildEvent a Source #

Events that a child thread may send to a parent thread.

Stats

Rate Control

data WorkerInfo Source #

We measure the individual worker latencies to estimate the number of workers needed or the amount of time we have to sleep between dispatches to achieve a particular rate when controlled pace mode it used.

Constructors

WorkerInfo 

Fields

data YieldRateInfo Source #

Rate control.

Constructors

YieldRateInfo 

Fields

  • svarLatencyTarget :: NanoSecond64
     
  • svarLatencyRange :: LatencyRange
     
  • svarRateBuffer :: Int
     
  • svarGainedLostYields :: IORef Count
    LOCKING
    Unlocked access. Modified by the consumer thread and unsafely read by the worker threads
  • svarAllTimeLatency :: IORef (Count, AbsTime)

    Actual latency/througput as seen from the consumer side, we count the yields and the time it took to generates those yields. This is used to increase or decrease the number of workers needed to achieve the desired rate. The idle time of workers is adjusted in this, so that we only account for the rate when the consumer actually demands data. XXX interval latency is enough, we can move this under diagnostics build [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threads

  • workerBootstrapLatency :: Maybe NanoSecond64
     
  • workerPollingInterval :: IORef Count

    After how many yields the worker should update the latency information. If the latency is high, this count is kept lower and vice-versa. XXX If the latency suddenly becomes too high this count may remain too high for long time, in such cases the consumer can change it. 0 means no latency computation XXX this is derivable from workerMeasuredLatency, can be removed. [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threads

  • workerPendingLatency :: IORef (Count, Count, NanoSecond64)

    This is in progress latency stats maintained by the workers which we empty into workerCollectedLatency stats at certain intervals - whenever we process the stream elements yielded in this period. The first count is all yields, the second count is only those yields for which the latency was measured to be non-zero (note that if the timer resolution is low the measured latency may be zero e.g. on JS platform). [LOCKING] Locked access. Modified by the consumer thread as well as worker threads. Workers modify it periodically based on workerPollingInterval and not on every yield to reduce the locking overhead. (allYieldCount, yieldCount, timeTaken)

  • workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    This is the second level stat which is an accmulation from workerPendingLatency stats. We keep accumulating latencies in this bucket until we have stats for a sufficient period and then we reset it to start collecting for the next period and retain the computed average latency for the last period in workerMeasuredLatency. [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threads (allYieldCount, yieldCount, timeTaken)

  • workerMeasuredLatency :: IORef NanoSecond64

    Latency as measured by workers, aggregated for the last period. [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threads

Output queue

Yield Limit

decrementYieldLimit :: Maybe (IORef Count) -> IO Bool Source #

A worker decrements the yield limit before it executes an action. However, the action may not result in an element being yielded, in that case we have to increment the yield limit.

Note that we need it to be an Int type so that we have the ability to undo a decrement that takes it below zero.

Configuration

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Constructors

Rate 

Fields

data StopWhen Source #

Specify when the Channel should stop.

Constructors

FirstStops

Stop when the first stream ends.

AllStop

Stop when all the streams end.

AnyStops

Stop when any one stream ends.

data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

Default config

magicMaxBuffer :: Word Source #

A magical value for the buffer size arrived at by running the smallest possible task and measuring the optimal value of the buffer for that. This is obviously dependent on hardware, this figure is based on a 2.2GHz intel core-i7 processor.

defaultConfig :: Config Source #

The fields prefixed by an _ are not to be accessed or updated directly but via smart accessor APIs. Use get/set routines instead of directly accessing the Config fields

Set config

maxThreads :: Int -> Config -> Config Source #

Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends.

eager :: Bool -> Config -> Config Source #

By default, processing of output from the worker threads is given priority over dispatching new workers. More workers are dispatched only when there is no output to process. With eager on workers are dispatched aggresively as long as there is more work to do irrespective of whether there is output pending to be processed. However, dispatching may stop if maxThreads or maxBuffer is reached.

Note: This option has no effect when rate has been specified.

Note: Not supported with interleaved.

stopWhen :: StopWhen -> Config -> Config Source #

Specify when the Channel should stop.

ordered :: Bool -> Config -> Config Source #

When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.

Note: Not supported with interleaved.

interleaved :: Bool -> Config -> Config Source #

Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.

Note: Can only be used on finite number of streams.

Note: Not supported with ordered.

rate :: Maybe Rate -> Config -> Config Source #

Specify the stream evaluation rate of a channel.

A Nothing value means there is no smart rate control, concurrent execution blocks only if maxThreads or maxBuffer is reached, or there are no more concurrent tasks to execute. This is the default.

When rate (throughput) is specified, concurrent production may be ramped up or down automatically to achieve the specified stream throughput. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a channel is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Maximum production rate is given by:

\(rate = \frac{maxThreads}{latency}\)

If we know the average latency of the tasks we can set maxThreads accordingly.

avgRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

minRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

maxRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

constRate :: Double -> Config -> Config Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Get config

Cleanup

cleanupSVar :: IORef (Set ThreadId) -> IO () Source #

Never called from a worker thread.

Diagnostics

dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String Source #

withDiagMVar :: Bool -> IO String -> String -> IO () -> IO () Source #

MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we can keep it on in production to debug problems quickly if and when they happen, but it may result in unexpected output when threads are left hanging until they are GCed because the consumer went away.

concatMapDivK :: Monad m => (StreamK m a -> m ()) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

concatMapDivK useTail useHead stream, divides the stream in head and tail, maps a stream generator on the head and maps an action on the tail of a stream. Returns the stream generated by the head.

Used for concurrent evaluation of streams using a Channel.