Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Synopsis
- minThreadDelay :: NanoSecond64
- collectLatency :: Bool -> SVarStats -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
- addThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
- delThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
- modifyThread :: MonadIO m => IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
- allThreadsDone :: MonadIO m => IORef (Set ThreadId) -> m Bool
- recordMaxWorkers :: MonadIO m => IORef Int -> SVarStats -> m ()
- dumpSVarStats :: Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
- newtype Count = Count Int64
- data Limit
- data ThreadAbort = ThreadAbort
- data ChildEvent a
- data SVarStats = SVarStats {}
- newSVarStats :: IO SVarStats
- data WorkerInfo = WorkerInfo {}
- data LatencyRange = LatencyRange {}
- data YieldRateInfo = YieldRateInfo {
- svarLatencyTarget :: NanoSecond64
- svarLatencyRange :: LatencyRange
- svarRateBuffer :: Int
- svarGainedLostYields :: IORef Count
- svarAllTimeLatency :: IORef (Count, AbsTime)
- workerBootstrapLatency :: Maybe NanoSecond64
- workerPollingInterval :: IORef Count
- workerPendingLatency :: IORef (Count, Count, NanoSecond64)
- workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
- workerMeasuredLatency :: IORef NanoSecond64
- newRateInfo :: Config -> IO (Maybe YieldRateInfo)
- readOutputQRaw :: IORef ([ChildEvent a], Int) -> Maybe SVarStats -> IO ([ChildEvent a], Int)
- readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
- ringDoorBell :: IORef Bool -> MVar () -> IO ()
- decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
- incrementYieldLimit :: Maybe (IORef Count) -> IO ()
- data Rate = Rate {}
- data StopWhen
- data Config
- magicMaxBuffer :: Word
- defaultConfig :: Config
- maxThreads :: Int -> Config -> Config
- maxBuffer :: Int -> Config -> Config
- maxYields :: Maybe Int64 -> Config -> Config
- inspect :: Bool -> Config -> Config
- eager :: Bool -> Config -> Config
- stopWhen :: StopWhen -> Config -> Config
- ordered :: Bool -> Config -> Config
- interleaved :: Bool -> Config -> Config
- boundThreads :: Bool -> Config -> Config
- rate :: Maybe Rate -> Config -> Config
- avgRate :: Double -> Config -> Config
- minRate :: Double -> Config -> Config
- maxRate :: Double -> Config -> Config
- constRate :: Double -> Config -> Config
- getMaxThreads :: Config -> Limit
- getMaxBuffer :: Config -> Limit
- getStreamRate :: Config -> Maybe Rate
- getStreamLatency :: Config -> Maybe NanoSecond64
- setStreamLatency :: Int -> Config -> Config
- getYieldLimit :: Config -> Maybe Count
- getInspectMode :: Config -> Bool
- getEagerDispatch :: Config -> Bool
- getStopWhen :: Config -> StopWhen
- getOrdered :: Config -> Bool
- getInterleaved :: Config -> Bool
- getBound :: Config -> Bool
- cleanupSVar :: IORef (Set ThreadId) -> IO ()
- dumpCreator :: Show a => a -> String
- dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String
- dumpDoorBell :: Show a => MVar a -> IO String
- dumpNeedDoorBell :: Show a => IORef a -> IO String
- dumpRunningThreads :: Show a => IORef a -> IO String
- dumpWorkerCount :: Show a => IORef a -> IO String
- withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
- printSVar :: IO String -> String -> IO ()
- data Work
- estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work
- isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
- workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
- sendWithDoorBell :: IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
- sendYield :: Limit -> Limit -> IORef Int -> Maybe WorkerInfo -> Maybe YieldRateInfo -> IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Bool
- sendStop :: IORef Int -> Maybe WorkerInfo -> Maybe YieldRateInfo -> IORef ([ChildEvent a], Int) -> MVar () -> IO ()
- handleChildException :: IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
Latency collection
minThreadDelay :: NanoSecond64 Source #
This is a magic number and it is overloaded, and used at several places to achieve batching:
- If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
- Collected latencies are computed and transferred to measured latency after a minimum of this period.
collectLatency :: Bool -> SVarStats -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64) Source #
Thread accounting
allThreadsDone :: MonadIO m => IORef (Set ThreadId) -> m Bool Source #
This is safe even if we are adding more threads concurrently because if
a child thread is adding another thread then anyway workerThreads
will
not be empty.
Diagnostics
dumpSVarStats :: Bool -> Maybe YieldRateInfo -> SVarStats -> IO String Source #
Types
Instances
Bounded Count Source # | |
Enum Count Source # | |
Defined in Streamly.Internal.Data.Channel.Types | |
Num Count Source # | |
Read Count Source # | |
Integral Count Source # | |
Real Count Source # | |
Defined in Streamly.Internal.Data.Channel.Types toRational :: Count -> Rational # | |
Show Count Source # | |
Eq Count Source # | |
Ord Count Source # | |
data ThreadAbort Source #
Channel driver throws this exception to all active workers to clean up the channel.
Instances
Exception ThreadAbort Source # | |
Defined in Streamly.Internal.Data.Channel.Types | |
Show ThreadAbort Source # | |
Defined in Streamly.Internal.Data.Channel.Types showsPrec :: Int -> ThreadAbort -> ShowS # show :: ThreadAbort -> String # showList :: [ThreadAbort] -> ShowS # |
data ChildEvent a Source #
Events that a child thread may send to a parent thread.
Stats
SVarStats | |
|
Rate Control
data WorkerInfo Source #
We measure the individual worker latencies to estimate the number of workers needed or the amount of time we have to sleep between dispatches to achieve a particular rate when controlled pace mode it used.
WorkerInfo | |
|
data LatencyRange Source #
Instances
Show LatencyRange Source # | |
Defined in Streamly.Internal.Data.Channel.Types showsPrec :: Int -> LatencyRange -> ShowS # show :: LatencyRange -> String # showList :: [LatencyRange] -> ShowS # |
data YieldRateInfo Source #
Rate control.
YieldRateInfo | |
|
newRateInfo :: Config -> IO (Maybe YieldRateInfo) Source #
Output queue
readOutputQRaw :: IORef ([ChildEvent a], Int) -> Maybe SVarStats -> IO ([ChildEvent a], Int) Source #
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int) Source #
Yield Limit
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool Source #
A worker decrements the yield limit before it executes an action. However, the action may not result in an element being yielded, in that case we have to increment the yield limit.
Note that we need it to be an Int type so that we have the ability to undo a decrement that takes it below zero.
Configuration
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
Specify when the Channel
should stop.
FirstStops | Stop when the first stream ends. |
AllStop | Stop when all the streams end. |
AnyStops | Stop when any one stream ends. |
An abstract type for specifying the configuration parameters of a
Channel
. Use Config -> Config
modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
Default config
magicMaxBuffer :: Word Source #
A magical value for the buffer size arrived at by running the smallest possible task and measuring the optimal value of the buffer for that. This is obviously dependent on hardware, this figure is based on a 2.2GHz intel core-i7 processor.
defaultConfig :: Config Source #
The fields prefixed by an _ are not to be accessed or updated directly but via smart accessor APIs. Use get/set routines instead of directly accessing the Config fields
Set config
maxThreads :: Int -> Config -> Config Source #
Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel
when the stream ends.
eager :: Bool -> Config -> Config Source #
By default, processing of output from the worker threads is given priority
over dispatching new workers. More workers are dispatched only when there is
no output to process. When eager
is set to True
, workers are dispatched
aggresively as long as there is more work to do irrespective of whether
there is output pending to be processed by the stream consumer. However,
dispatching may stop if maxThreads
or maxBuffer
is reached.
Note: This option has no effect when rate has been specified.
Note: Not supported with interleaved
.
ordered :: Bool -> Config -> Config Source #
When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.
Note: Not supported with interleaved
.
interleaved :: Bool -> Config -> Config Source #
Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.
Note: Can only be used on finite number of streams.
Note: Not supported with ordered
.
boundThreads :: Bool -> Config -> Config Source #
Spawn bound threads (i.e., spawn threads using forkOS
instead of
forkIO
). The default value is False
.
Currently, this only takes effect only for concurrent folds.
rate :: Maybe Rate -> Config -> Config Source #
Specify the stream evaluation rate of a channel.
A Nothing
value means there is no smart rate control, concurrent execution
blocks only if maxThreads
or maxBuffer
is reached, or there are no more
concurrent tasks to execute. This is the default.
When rate (throughput) is specified, concurrent production may be ramped
up or down automatically to achieve the specified stream throughput. The
specific behavior for different styles of Rate
specifications is
documented under Rate
. The effective maximum production rate achieved by
a channel is governed by:
- The
maxThreads
limit - The
maxBuffer
limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Maximum production rate is given by:
\(rate = \frac{maxThreads}{latency}\)
If we know the average latency of the tasks we can set maxThreads
accordingly.
avgRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz
). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
minRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
maxRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
constRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Get config
getMaxThreads :: Config -> Limit Source #
getMaxBuffer :: Config -> Limit Source #
getInspectMode :: Config -> Bool Source #
getEagerDispatch :: Config -> Bool Source #
getStopWhen :: Config -> StopWhen Source #
getOrdered :: Config -> Bool Source #
getInterleaved :: Config -> Bool Source #
Cleanup
Diagnostics
dumpCreator :: Show a => a -> String Source #
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO () Source #
MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we can keep it on in production to debug problems quickly if and when they happen, but it may result in unexpected output when threads are left hanging until they are GCed because the consumer went away.
estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work Source #
isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool Source #
workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool Source #
CAUTION! this also updates the yield count and therefore should be called only when we are actually yielding an element.
Send Events
sendWithDoorBell :: IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int Source #
sendYield :: Limit -> Limit -> IORef Int -> Maybe WorkerInfo -> Maybe YieldRateInfo -> IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Bool Source #
Returns whether the worker should continue (True) or stop (False).
sendStop :: IORef Int -> Maybe WorkerInfo -> Maybe YieldRateInfo -> IORef ([ChildEvent a], Int) -> MVar () -> IO () Source #
handleChildException :: IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO () Source #