streamly-0.8.0: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.SVar

Description

 
Synopsis

Unfold streams from SVar

Usually the SVar is used to concurrently evaluate multiple actions in a stream using many worker threads that push the results to the SVar and a single puller that pulls them from SVar generating the evaluated stream.

                 input stream
                      |
    <-----------------|<--------worker
    |  exceptions     |
output stream <------SVar<------worker
                      |
                      |<--------worker

The puller itself schedules the worker threads based on demand. Exceptions are propagated from the worker threads to the puller.

fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a Source #

Generate a stream from an SVar. An unevaluated stream can be pushed to an SVar using toSVar. As we pull a stream from the SVar the input stream gets evaluated concurrently. The evaluation depends on the SVar style and the configuration parameters e.g. using the maxBuffer/maxThreads combinators.

fromSVarD :: MonadAsync m => SVar t m a -> Stream m a Source #

Like fromSVar but generates a StreamD style stream instead of CPS.

Fold streams to SVar

toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m () Source #

Write a stream to an SVar in a non-blocking manner. The stream is evaluated concurrently as it is read back from the SVar using fromSVar.

toSVarParallel :: MonadAsync m => State t m a -> SVar t m a -> Stream m a -> m () Source #

Fold the supplied stream to the SVar asynchronously using Parallel concurrency style. {--}

Concurrent folds

To run folds concurrently, we need to decouple the fold execution from the stream production. We use the SVar to do that, we have a single worker pushing the stream elements to the SVar and on the consumer side a fold driver pulls the values and folds them.

Fold worker <------SVar<------input stream
    |  exceptions  |
    --------------->

We need a channel for pushing exceptions from the fold worker to the stream pusher. The stream may be pushed to multiple folds at the same time. For that we need one SVar per fold:

Fold worker <------SVar<---
                   |       |
Fold worker <------SVar<------input stream
                   |       |
Fold worker <------SVar<---

Unlike in case concurrent stream evaluation, the puller does not drive the scheduling and concurrent execution of the stream. The stream is simply pushed by the stream producer at its own rate. The fold worker just pulls it and folds it.

Note: If the stream pusher terminates due to an exception, we do not actively terminate the fold. It gets cleaned up by the GC.

fromConsumer :: MonadAsync m => SVar Stream m a -> m Bool Source #

Poll for events sent by the fold consumer to the stream pusher. The fold consumer can send a Stop event or an exception. When a Stop is received this function returns True. If an exception is recieved then it throws the exception.

pushToFold :: MonadAsync m => SVar Stream m a -> a -> m Bool Source #

Push values from a stream to a fold worker via an SVar. Before pushing a value to the SVar it polls for events received from the fold consumer. If a stop event is received then it returns True otherwise false. Propagates exceptions received from the fold consumer.

teeToSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> t m a Source #

Tap a stream and send the elements to the specified SVar in addition to yielding them again. The SVar runs a fold consumer. Elements are tapped and sent to the SVar until the fold finishes. Any exceptions from the fold evaluation are propagated in the current thread.

------input stream---------output stream----->
                   /|\   |
        exceptions  |    |  input
                    |   \|/
                    ----SVar
                         |
                        Fold

newFoldSVar :: (IsStream t, MonadAsync m) => State Stream m a -> (t m a -> m b) -> m (SVar Stream m a) Source #

Create a Fold style SVar that runs a supplied fold function as the consumer. Any elements sent to the SVar are consumed by the supplied fold function.

newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a) Source #

Like newFoldSVar except that it uses a Fold instead of a fold function.