scc-0.5.1: Streaming component combinators

Control.Concurrent.SCC.Streams

Contents

Description

This module defines Source and Sink types and pipe functions that create them. The method get on Source abstracts away Control.Concurrent.Coroutine.await, and the method put on Sink is a higher-level abstraction of Control.Concurrent.Coroutine.SuspensionFunctors.yield. With this arrangement, a single coroutine can yield values to multiple sinks and await values from multiple sources with no need to change the Control.Concurrent.Coroutine.Coroutine functor; the only requirement is for each funtor of the sources and sinks the coroutine uses to be an Control.Concurrent.Coroutine.AncestorFunctor of the coroutine's functor. For example, coroutine zip that takes two sources and one sink would be declared like this:

 zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()

Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a new Sink to write to and the consumer a new Source to read from, in addition to all the streams that are visible in the original coroutine. The following function, for example, uses the zip coroutine above to add together the values from two Integer sources:

 add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m ()
 add source1 source2 sink = do pipe
                                  (pairSink-> zip source1 source2 pairSink)              -- producer coroutine
                                  (pairSource-> mapStream (uncurry (+)) pairSource sink) -- consumer coroutine
                               return ()

Synopsis

Sink and Source types

data Sink m a x Source

A Sink can be used to yield values from any nested Coroutine computation whose functor provably descends from the functor a. It's the write-only end of a communication channel created by pipe.

data Source m a x Source

A Source can be used to read values into any nested Coroutine computation whose functor provably descends from the functor a. It's the read-only end of a communication channel created by pipe.

class (Functor a, Functor d) => AncestorFunctor a d

Class of functors that can be lifted.

Instances

(d' ~ Parent d, Functor a, ChildFunctor d, AncestorFunctor a d') => AncestorFunctor a d 
Functor a => AncestorFunctor a a 

Sink and Source constructors

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

The pipe function splits the computation into two concurrent parts, producer and consumer. The producer is given a Sink to put values into, and consumer a Source to get those values from. Once producer and consumer both complete, pipe returns their paired results.

pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

The pipeP function is equivalent to pipe, except it runs the producer and the consumer in parallel.

pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

The pipePS function acts either as pipeP or as pipe, depending on the argument parallel.

nullSink :: forall m a x. Monad m => Sink m a xSource

A disconnected sink that ignores all values put into it.

nullSource :: forall m a x. Monad m => Source m a xSource

An empty source whose get always returns Nothing.

Operations on sinks and sources

Singleton operations

get :: Source m a x -> forall d. AncestorFunctor a d => Coroutine d m (Maybe x)Source

Function get tries to get a value from the given Source argument. The intervening Coroutine computations suspend all the way to the pipe function invocation that created the source. The function returns Nothing if the argument source is empty.

put :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m ()Source

This function puts a value into the given Sink. The intervening Coroutine computations suspend up to the pipe invocation that has created the argument sink.

getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source

Invokes its first argument with the value it gets from the source, if there is any to get.

Lifting functions

liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d xSource

Converts a Sink on the ancestor functor a into a sink on the descendant functor d.

liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d xSource

Converts a Source on the ancestor functor a into a source on the descendant functor d.

Bulk operations

pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

pour copies all data from the source argument into the sink argument.

tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source

tee is similar to pour except it distributes every input value from its source argument into its both sink arguments.

teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 xSource

Every value put into a teeSink result sink goes into its both argument sinks: put (teeSink s1 s2) x is equivalent to put s1 x >> put s2 x.

teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 xSource

The Source returned by teeSource writes every value read from its argument source into the argument sink before providing it back.

mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

mapStream is like pour that applies the function f to each argument before passing it into the sink.

mapSource :: forall m a x y. Monad m => (x -> y) -> Source m a x -> Source m a ySource

An equivalent of map that works on a Source instead of a list. The argument function is applied to every value after it's read from the source argument.

mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a xSource

An equivalent of map that works on a Sink instead of a list. The argument function is applied to every value vefore it's written to the sink argument.

mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

mapMStream is similar to mapM. It draws the values from a Source instead of a list, writes the mapped values to a Sink, and returns a Coroutine.

mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a ySource

An equivalent of mapM that works on a Source instead of a list. Similar to mapSource, except the function argument is monadic and may have perform effects.

mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a xSource

An equivalent of mapM that works on a Sink instead of a list. Similar to mapSink, except the function argument is monadic and may have perform effects.

mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source

mapMStream_ is similar to mapM_ except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.

mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a xSource

mapMaybeSink is to mapSink like mapMaybe is to map.

mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a ySource

filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

An equivalent of filterM. Draws the values from a Source instead of a list, writes the filtered values to a Sink, and returns a Coroutine.

filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a xSource

An equivalent of filterM; filters a Source instead of a list.

filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a xSource

An equivalent of filterM; filters a Sink instead of a list.

foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m accSource

Similar to foldl, but reads the values from a Source instead of a list.

foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m accSource

foldMStream is similar to foldM except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.

foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()Source

A version of foldMStream that ignores the final result value.

mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m accSource

mapAccumStream is similar to mapAccumL except it reads the values from a Source instead of a list and writes the mapped values into a Sink instead of returning another list.

partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source

Equivalent to partition. Takes a Source instead of a list argument and partitions its contents into the two Sink arguments.

unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m accSource

unfoldMStream is a version of unfoldr that writes the generated values into a Sink instead of returning a list.

unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()Source

unmapMStream_ is opposite of mapMStream_; it takes a Sink instead of a Source argument and writes the generated values into it.

zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source

zipWithMStream is similar to zipWithM except it draws the values from two Source arguments instead of two lists, sends the results into a Sink, and works with Coroutine instead of an arbitrary monad.

parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source

parZipWithMStream is equivalent to zipWithMStream, but it consumes the two sources in parallel.

getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]Source

getList returns the list of all values generated by the source.

putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()Source

putList puts entire list into its sink argument.

putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()Source

Like putList, except it puts the contents of the given Seq into the sink.

Utility functions

cond :: a -> a -> Bool -> aSource

A utility function wrapping if-then-else, useful for handling monadic truth values