scc-0.5: Streaming component combinatorsSource codeContentsIndex
Control.Concurrent.SCC.Streams
Contents
Sink and Source types
Sink and Source constructors
Operations on sinks and sources
Singleton operations
Lifting functions
Bulk operations
Utility functions
Description

This module defines Source and Sink types and pipe functions that create them. The method get on Source abstracts away Control.Concurrent.Coroutine.await, and the method put on Sink is a higher-level abstraction of Control.Concurrent.Coroutine.SuspensionFunctors.yield. With this arrangement, a single coroutine can yield values to multiple sinks and await values from multiple sources with no need to change the Control.Concurrent.Coroutine.Coroutine functor; the only requirement is for each funtor of the sources and sinks the coroutine uses to be an Control.Concurrent.Coroutine.AncestorFunctor of the coroutine's functor. For example, coroutine zip that takes two sources and one sink would be declared like this:

 zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()

Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a new Sink to write to and the consumer a new Source to read from, in addition to all the streams that are visible in the original coroutine. The following function, for example, uses the zip coroutine above to add together the values from two Integer sources:

 add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m ()
 add source1 source2 sink = do pipe
                                  (pairSink-> zip source1 source2 pairSink)              -- producer coroutine
                                  (pairSource-> mapStream (uncurry (+)) pairSource sink) -- consumer coroutine
                               return ()
Synopsis
data Sink m a x
data Source m a x
type SinkFunctor a x = EitherFunctor a (Yield x)
type SourceFunctor a x = EitherFunctor a (Await (Maybe x))
class (Functor a, Functor d) => AncestorFunctor a d
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
nullSink :: forall m a x. Monad m => Sink m a x
nullSource :: forall m a x. Monad m => Source m a x
get :: Source m a x -> forall d. AncestorFunctor a d => Coroutine d m (Maybe x)
put :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m ()
getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 x
mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapSource :: forall m a x y. Monad m => (x -> y) -> Source m a x -> Source m a y
mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a x
mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a y
mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a x
mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a x
mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a y
filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a x
filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a x
foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()
mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m acc
unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()
cond :: a -> a -> Bool -> a
Sink and Source types
data Sink m a x Source
A Sink can be used to yield values from any nested Coroutine computation whose functor provably descends from the functor a. It's the write-only end of a communication channel created by pipe.
data Source m a x Source
A Source can be used to read values into any nested Coroutine computation whose functor provably descends from the functor a. It's the read-only end of a communication channel created by pipe.
type SinkFunctor a x = EitherFunctor a (Yield x)Source
type SourceFunctor a x = EitherFunctor a (Await (Maybe x))Source
class (Functor a, Functor d) => AncestorFunctor a d Source
Class of functors that can be lifted.
show/hide Instances
(d' ~ Parent d, Functor a, ChildFunctor d, AncestorFunctor a d') => AncestorFunctor a d
Functor a => AncestorFunctor a a
Sink and Source constructors
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
The pipe function splits the computation into two concurrent parts, producer and consumer. The producer is given a Sink to put values into, and consumer a Source to get those values from. Once producer and consumer both complete, pipe returns their paired results.
pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
The pipeP function is equivalent to pipe, except it runs the producer and the consumer in parallel.
pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
The pipePS function acts either as pipeP or as pipe, depending on the argument parallel.
nullSink :: forall m a x. Monad m => Sink m a xSource
A disconnected sink that ignores all values put into it.
nullSource :: forall m a x. Monad m => Source m a xSource
An empty source whose get always returns Nothing.
Operations on sinks and sources
Singleton operations
get :: Source m a x -> forall d. AncestorFunctor a d => Coroutine d m (Maybe x)Source
Function get tries to get a value from the given Source argument. The intervening Coroutine computations suspend all the way to the pipe function invocation that created the source. The function returns Nothing if the argument source is empty.
put :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m ()Source
This function puts a value into the given Sink. The intervening Coroutine computations suspend up to the pipe invocation that has created the argument sink.
getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source
Invokes its first argument with the value it gets from the source, if there is any to get.
Lifting functions
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d xSource
Converts a Sink on the ancestor functor a into a sink on the descendant functor d.
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d xSource
Converts a Source on the ancestor functor a into a source on the descendant functor d.
Bulk operations
pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source
pour copies all data from the source argument into the sink argument.
tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source
tee is similar to pour except it distributes every input value from its source argument into its both sink arguments.
teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 xSource
Every value put into a teeSink result sink goes into its both argument sinks: put (teeSink s1 s2) x is equivalent to put s1 x >> put s2 x.
teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 xSource
The Source returned by teeSource writes every value read from its argument source into the argument sink before providing it back.
mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source
mapStream is like pour that applies the function f to each argument before passing it into the sink.
mapSource :: forall m a x y. Monad m => (x -> y) -> Source m a x -> Source m a ySource
An equivalent of map that works on a Source instead of a list. The argument function is applied to every value after it's read from the source argument.
mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a xSource
An equivalent of map that works on a Sink instead of a list. The argument function is applied to every value vefore it's written to the sink argument.
mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source
mapMStream is similar to mapM. It draws the values from a Source instead of a list, writes the mapped values to a Sink, and returns a Coroutine.
mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a ySource
An equivalent of mapM that works on a Source instead of a list. Similar to mapSource, except the function argument is monadic and may have perform effects.
mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a xSource
An equivalent of mapM that works on a Sink instead of a list. Similar to mapSink, except the function argument is monadic and may have perform effects.
mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source
mapMStream_ is similar to mapM_ except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.
mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source
mapMaybeStream is to mapStream like mapMaybe is to map.
mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a xSource
mapMaybeSink is to mapSink like mapMaybe is to map.
mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a ySource
mapMaybeSource is to mapSource like mapMaybe is to map.
filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source
An equivalent of filterM. Draws the values from a Source instead of a list, writes the filtered values to a Sink, and returns a Coroutine.
filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a xSource
An equivalent of filterM; filters a Source instead of a list.
filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a xSource
An equivalent of filterM; filters a Sink instead of a list.
foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m accSource
Similar to foldl, but reads the values from a Source instead of a list.
foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m accSource
foldMStream is similar to foldM except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.
foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()Source
A version of foldMStream that ignores the final result value.
mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m accSource
mapAccumStream is similar to mapAccumL except it reads the values from a Source instead of a list and writes the mapped values into a Sink instead of returning another list.
partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source
Equivalent to partition. Takes a Source instead of a list argument and partitions its contents into the two Sink arguments.
unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m accSource
unfoldMStream is a version of unfoldr that writes the generated values into a Sink instead of returning a list.
unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()Source
unmapMStream_ is opposite of mapMStream_; it takes a Sink instead of a Source argument and writes the generated values into it.
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source
zipWithMStream is similar to zipWithM except it draws the values from two Source arguments instead of two lists, sends the results into a Sink, and works with Coroutine instead of an arbitrary monad.
parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source
parZipWithMStream is equivalent to zipWithMStream, but it consumes the two sources in parallel.
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]Source
getList returns the list of all values generated by the source.
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()Source
putList puts entire list into its sink argument.
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()Source
Like putList, except it puts the contents of the given Seq into the sink.
Utility functions
cond :: a -> a -> Bool -> aSource
A utility function wrapping if-then-else, useful for handling monadic truth values
Produced by Haddock version 2.7.2