streamly-0.8.1.1: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.SVar.Type

Description

 
Synopsis

Parent child communication

data ChildEvent a Source #

Events that a child thread may send to a parent thread.

data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a Source #

Sorting out-of-turn outputs in a heap for Ahead style streams

SVar

newtype Count Source #

Constructors

Count Int64 

Instances

Instances details
Bounded Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Enum Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Eq Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Methods

(==) :: Count -> Count -> Bool #

(/=) :: Count -> Count -> Bool #

Integral Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Num Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Ord Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Methods

compare :: Count -> Count -> Ordering #

(<) :: Count -> Count -> Bool #

(<=) :: Count -> Count -> Bool #

(>) :: Count -> Count -> Bool #

(>=) :: Count -> Count -> Bool #

max :: Count -> Count -> Count #

min :: Count -> Count -> Count #

Read Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Real Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Methods

toRational :: Count -> Rational #

Show Count Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Methods

showsPrec :: Int -> Count -> ShowS #

show :: Count -> String #

showList :: [Count] -> ShowS #

data Limit Source #

Constructors

Unlimited 
Limited Word 

Instances

Instances details
Eq Limit Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Methods

(==) :: Limit -> Limit -> Bool #

(/=) :: Limit -> Limit -> Bool #

Ord Limit Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Methods

compare :: Limit -> Limit -> Ordering #

(<) :: Limit -> Limit -> Bool #

(<=) :: Limit -> Limit -> Bool #

(>) :: Limit -> Limit -> Bool #

(>=) :: Limit -> Limit -> Bool #

max :: Limit -> Limit -> Limit #

min :: Limit -> Limit -> Limit #

Show Limit Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Methods

showsPrec :: Int -> Limit -> ShowS #

show :: Limit -> String #

showList :: [Limit] -> ShowS #

data SVarStyle Source #

Identify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.

Instances

Instances details
Eq SVarStyle Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

Show SVarStyle Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Type

data WorkerInfo Source #

An SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.

An SVar is a mini scheduler, it has an associated workLoop that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. A outputDoorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the SVar by fromStreamVar on demand if the output produced is not keeping pace with the consumer. On bounded SVars, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.

New work is enqueued either at the time of creation of the SVar or as a result of executing the parallel combinators i.e. <| and <|> when the already enqueued computations get evaluated. See joinStreamVarAsync.

data PushBufferPolicy Source #

Buffering policy for persistent push workers (in ParallelT). In a pull style SVar (in AsyncT, AheadT etc.), the consumer side dispatches workers on demand, workers terminate if the buffer is full or if the consumer is not cosuming fast enough. In a push style SVar, a worker is dispatched only once, workers are persistent and keep pushing work to the consumer via a bounded buffer. If the buffer becomes full the worker either blocks, or it can drop an item from the buffer to make space.

Pull style SVars are useful in lazy stream evaluation whereas push style SVars are useful in strict left Folds.

XXX Maybe we can separate the implementation in two different types instead of using a common SVar type.

State threaded around the stream

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0 (Streamly)

Since: 0.8.0

Constructors

Rate 

Fields

data State t m a Source #

Default State

Type cast

adaptState :: State t m a -> State t n b Source #

Adapt the stream state from one type to another.

State accessors

setMaxThreads :: Int -> State t m a -> State t m a Source #

setMaxBuffer :: Int -> State t m a -> State t m a Source #

setStreamRate :: Maybe Rate -> State t m a -> State t m a Source #

setStreamLatency :: Int -> State t m a -> State t m a Source #

setYieldLimit :: Maybe Int64 -> State t m a -> State t m a Source #

setInspectMode :: State t m a -> State t m a Source #