streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.SVar.Eliminate

Description

Eliminate a stream by distributing it to multiple SVars concurrently.

Synopsis

Concurrent Function Application

toSVarParallel :: MonadAsync m => State t m a -> SVar t m a -> Stream m a -> m () Source #

Fold the supplied stream to the SVar asynchronously using Parallel concurrency style. {-# INLINE [1] toSVarParallel #-}

Concurrent folds

To run folds concurrently, we need to decouple the fold execution from the stream production. We use the SVar to do that, we have a single worker pushing the stream elements to the SVar and on the consumer side a fold driver pulls the values and folds them.

Fold worker <------SVar<------input stream
    |  exceptions  |
    --------------->

We need a channel for pushing exceptions from the fold worker to the stream pusher. The stream may be pushed to multiple folds at the same time. For that we need one SVar per fold:

Fold worker <------SVar<---
                   |       |
Fold worker <------SVar<------input stream
                   |       |
Fold worker <------SVar<---

Unlike in case concurrent stream evaluation, the puller does not drive the scheduling and concurrent execution of the stream. The stream is simply pushed by the stream producer at its own rate. The fold worker just pulls it and folds it.

Note: If the stream pusher terminates due to an exception, we do not actively terminate the fold. It gets cleaned up by the GC.

newFoldSVar :: MonadAsync m => State Stream m a -> (SerialT m a -> m b) -> m (SVar Stream m a) Source #

Create a Fold style SVar that runs a supplied fold function as the consumer. Any elements sent to the SVar are consumed by the supplied fold function.

newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a) Source #

Like newFoldSVar except that it uses a Fold instead of a fold function.

fromConsumer :: MonadAsync m => SVar Stream m a -> m Bool Source #

Poll for events sent by the fold consumer to the stream pusher. The fold consumer can send a Stop event or an exception. When a Stop is received this function returns True. If an exception is recieved then it throws the exception.

pushToFold :: MonadAsync m => SVar Stream m a -> a -> m Bool Source #

Push values from a stream to a fold worker via an SVar. Before pushing a value to the SVar it polls for events received from the fold consumer. If a stop event is received then it returns True otherwise false. Propagates exceptions received from the fold consumer.

teeToSVar :: MonadAsync m => SVar Stream m a -> SerialT m a -> SerialT m a Source #

Tap a stream and send the elements to the specified SVar in addition to yielding them again. The SVar runs a fold consumer. Elements are tapped and sent to the SVar until the fold finishes. Any exceptions from the fold evaluation are propagated in the current thread.

------input stream---------output stream----->
                   /|\   |
        exceptions  |    |  input
                    |   \|/
                    ----SVar
                         |
                        Fold