This module defines `Source`

and `Sink`

types and `pipe`

functions that create them. The method `get`

on `Source`

abstracts away `Control.Concurrent.Coroutine.await`

, and the method `put`

on `Sink`

is a higher-level abstraction of
`Control.Concurrent.Coroutine.SuspensionFunctors.yield`

. With this arrangement, a single coroutine can yield values
to multiple sinks and await values from multiple sources with no need to change the
`Control.Concurrent.Coroutine.Coroutine`

functor; the only requirement is for each funtor of the sources and sinks
the coroutine uses to be an `Control.Concurrent.Coroutine.AncestorFunctor`

of the coroutine's functor. For example,
coroutine *zip* that takes two sources and one sink would be declared like this:

zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()

Sources, sinks, and coroutines communicating through them are all created using the `pipe`

function or one of its
variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
new `Sink`

to write to and the consumer a new `Source`

to read from, in addition to all the streams that are visible
in the original coroutine. The following function, for example, uses the *zip* coroutine above to add together the
values from two Integer sources:

add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m () add source1 source2 sink = do pipe (pairSink-> zip source1 source2 pairSink) -- producer coroutine (pairSource-> mapStream (uncurry (+)) pairSource sink) -- consumer coroutine return ()

- data Sink m a x
- data Source m a x
- type SinkFunctor a x = EitherFunctor a (Yield x)
- type SourceFunctor a x = EitherFunctor a (Await (Maybe x))
- class (Functor a, Functor d) => AncestorFunctor a d
- pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- nullSink :: forall m a x. Monad m => Sink m a x
- nullSource :: forall m a x. Monad m => Source m a x
- get :: Source m a x -> forall d. AncestorFunctor a d => Coroutine d m (Maybe x)
- put :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m ()
- getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
- liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
- liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
- pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
- teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
- teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 x
- mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapSource :: forall m a x y. Monad m => (x -> y) -> Source m a x -> Source m a y
- mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a x
- mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a y
- mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a x
- mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
- mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a x
- mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a y
- filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a x
- filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a x
- foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
- foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
- foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()
- mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
- partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
- unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m acc
- unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()
- zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
- parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
- getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
- putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()
- putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()
- cond :: a -> a -> Bool -> a

# Sink and Source types

type SinkFunctor a x = EitherFunctor a (Yield x)Source

type SourceFunctor a x = EitherFunctor a (Await (Maybe x))Source

class (Functor a, Functor d) => AncestorFunctor a d

Class of functors that can be lifted.

(d' ~ Parent d, Functor a, ChildFunctor d, AncestorFunctor a d') => AncestorFunctor a d | |

Functor a => AncestorFunctor a a |

# Sink and Source constructors

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

nullSink :: forall m a x. Monad m => Sink m a xSource

A disconnected sink that ignores all values `put`

into it.

nullSource :: forall m a x. Monad m => Source m a xSource

An empty source whose `get`

always returns Nothing.

# Operations on sinks and sources

## Singleton operations

getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source

Invokes its first argument with the value it gets from the source, if there is any to get.

## Lifting functions

liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d xSource

Converts a `Sink`

on the ancestor functor *a* into a sink on the descendant functor *d*.

liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d xSource

Converts a `Source`

on the ancestor functor *a* into a source on the descendant functor *d*.

## Bulk operations

pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

`pour`

copies all data from the *source* argument into the *sink* argument.

tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source

teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 xSource

teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 xSource

mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

`mapMStream`

is similar to `mapM`

. It draws the values from a `Source`

instead of a list, writes the
mapped values to a `Sink`

, and returns a `Coroutine`

.

mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a ySource

mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a xSource

mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source

`mapMStream_`

is similar to `mapM_`

except it draws the values from a `Source`

instead of a list and
works with `Coroutine`

instead of an arbitrary monad.

mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

`mapMaybeStream`

is to `mapStream`

like `mapMaybe`

is to `map`

.

mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a xSource

`mapMaybeSink`

is to `mapSink`

like `mapMaybe`

is to `map`

.

mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a ySource

`mapMaybeSource`

is to `mapSource`

like `mapMaybe`

is to `map`

.

filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a xSource

filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a xSource

foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m accSource

foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m accSource

`foldMStream`

is similar to `foldM`

except it draws the values from a `Source`

instead of a list and
works with `Coroutine`

instead of an arbitrary monad.

foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()Source

A version of `foldMStream`

that ignores the final result value.

mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m accSource

`mapAccumStream`

is similar to `mapAccumL`

except it reads the values from a `Source`

instead of a list
and writes the mapped values into a `Sink`

instead of returning another list.

partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source

unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m accSource

`unfoldMStream`

is a version of `unfoldr`

that writes the generated values into a `Sink`

instead of
returning a list.

unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()Source

`unmapMStream_`

is opposite of `mapMStream_`

; it takes a `Sink`

instead of a `Source`

argument and writes the
generated values into it.

zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source

`zipWithMStream`

is similar to `zipWithM`

except it draws the values from two `Source`

arguments
instead of two lists, sends the results into a `Sink`

, and works with `Coroutine`

instead of an arbitrary monad.

parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source

`parZipWithMStream`

is equivalent to `zipWithMStream`

, but it consumes the two sources in parallel.

getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]Source

`getList`

returns the list of all values generated by the source.

putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()Source

`putList`

puts entire list into its *sink* argument.

putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()Source