streamly-0.10.1: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.SVar

Description

Deprecated: SVar is replaced by Channel.

Synopsis

Adjusting Limits

incrementYieldLimit :: SVar t m a -> IO () Source #

decrementBufferLimit :: SVar t m a -> IO () Source #

incrementBufferLimit :: SVar t m a -> IO () Source #

resetBufferLimit :: SVar t m a -> IO () Source #

Rate Control

data Work Source #

Constructors

BlockWait NanoSecond64 
PartialWorker Count 
ManyWorkers Int Count 

Instances

Instances details
Show Work Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Worker

isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool Source #

estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work Source #

updateYieldCount :: WorkerInfo -> IO Count Source #

minThreadDelay :: NanoSecond64 Source #

This is a magic number and it is overloaded, and used at several places to achieve batching:

  1. If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
  2. Collected latencies are computed and transferred to measured latency after a minimum of this period.

workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool Source #

workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO () Source #

Send Events

send :: SVar t m a -> ChildEvent a -> IO Int Source #

This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.

ringDoorBell :: SVar t m a -> IO () Source #

Yield

sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool Source #

sendToProducer :: SVar t m a -> ChildEvent a -> IO Int Source #

Stop

sendStop :: SVar t m a -> Maybe WorkerInfo -> IO () Source #

sendStopToProducer :: MonadIO m => SVar t m a -> m () Source #

Exception

Latency collection

collectLatency :: SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64) Source #

Diagnostics

withDiagMVar :: SVar t m a -> String -> IO () -> IO () Source #

dumpSVar :: SVar t m a -> IO String Source #

printSVar :: SVar t m a -> String -> IO () Source #

Thread accounting

delThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

allThreadsDone :: MonadIO m => SVar t m a -> m Bool Source #

This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway workerThreads will not be empty.

Dispatching

recordMaxWorkers :: MonadIO m => SVar t m a -> m () Source #

pushWorker :: MonadAsync m => Count -> SVar t m a -> m () Source #

pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #

In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.

dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool Source #

dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool Source #

sendWorkerWait :: MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m () Source #

sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #

sendWorkerDelay :: SVar t m a -> IO () Source #

sendWorkerDelayPaced :: SVar t m a -> IO () Source #

Read Output

readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int) Source #

readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int) Source #

readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #

readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #

Postprocess Hook After Reading

postProcessPaced :: MonadAsync m => SVar t m a -> m Bool Source #

postProcessBounded :: MonadAsync m => SVar t m a -> m Bool Source #

Release Resources

cleanupSVar :: SVar t m a -> IO () Source #

cleanupSVarFromWorker :: SVar t m a -> IO () Source #

New SVar

getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo) Source #

newSVarStats :: IO SVarStats Source #

Parallel

newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #

Ahead

enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO () Source #

reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #

queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool Source #

dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int)) Source #

data HeapDequeueResult t m a Source #

Constructors

Clearing 
Waiting Int 
Ready (Entry Int (AheadHeapEntry t m a)) 

dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a) Source #

dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a) Source #

requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO () Source #

updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO () Source #

withIORef :: IORef a -> (a -> IO b) -> IO b Source #

newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #