Safe Haskell | None |
---|

This module defines `Source`

and `Sink`

types and `pipe`

functions that create them. The method `get`

on `Source`

abstracts away `await`

, and the method `put`

on `Sink`

is a higher-level
abstraction of `yield`

. With this arrangement, a single coroutine can
yield values to multiple sinks and await values from multiple sources with no need to change the
`Coroutine`

functor. The only requirement is that each functor of the sources and sinks the
coroutine uses must be an `AncestorFunctor`

of the coroutine's own functor. For
example, a coroutine that takes two sources and one sink might be declared like this:

zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [(x, y)] -> Coroutine d m ()

Sources, sinks, and coroutines communicating through them are all created using the `pipe`

function or one of its
variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
new `Sink`

to write to and the consumer a new `Source`

to read from, in addition to all the streams they inherit from
the current coroutine. The following function, for example, uses the *zip* coroutine declard above to add together
the pairs of values from two Integer sources:

add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 [Integer] -> Source m a2 [Integer] -> Sink m a3 [Integer] -> Coroutine d m () add source1 source2 sink = do pipe (pairSink-> zip source1 source2 pairSink) -- producer (pairSource-> mapStream (List.map $ uncurry (+)) pairSource sink) -- consumer return ()

- data Sink m a x
- data Source m a x
- type SinkFunctor a x = EitherFunctor a (Request x x)
- type SourceFunctor a x = EitherFunctor a (ReadRequest x)
- class (Functor a, Functor d) => AncestorFunctor a d
- pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x
- get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
- getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()
- getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
- peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
- put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m ()
- tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool
- liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
- liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
- pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
- pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
- teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
- getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
- putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x
- putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m x
- getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m y
- getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m y
- getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m x
- getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)
- pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
- type Reader x py y = x -> Reading x py y
- data Reading x py y
- data ReadingResult x py y
- = ResultPart py (Reader x py y)
- | FinalResult y

- markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)]
- markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x
- mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a [y] -> Sink m a [x]
- mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()
- concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
- mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
- foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
- mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
- concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
- partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m ()
- mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
- mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
- filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc
- foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m ()
- unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc
- unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()
- unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m ()
- zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()
- parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()

# Sink and Source types

type SinkFunctor a x = EitherFunctor a (Request x x)Source

type SourceFunctor a x = EitherFunctor a (ReadRequest x)Source

class (Functor a, Functor d) => AncestorFunctor a d

Class of functors that can be lifted.

(Functor a, ChildFunctor d, ~ (* -> *) d' (Parent d), AncestorFunctor a d') => AncestorFunctor a d | |

Functor a => AncestorFunctor a a |

# Sink and Source constructors

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

A generic version of `pipe`

. The first argument is used to combine two computation steps.

nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a xSource

A disconnected sink that consumes and ignores all data `put`

into it.

# Operations on sinks and sources

## Singleton operations

get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)Source

getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()Source

Invokes its first argument with the value it gets from the source, if there is any to get.

getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m xSource

peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)Source

put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m ()Source

tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m BoolSource

Like `put`

, but returns a Bool that determines if the sink is still active.

## Lifting functions

liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d xSource

Converts a `Sink`

on the ancestor functor *a* into a sink on the descendant functor *d*.

liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d xSource

Converts a `Source`

on the ancestor functor *a* into a source on the descendant functor *d*.

## Bulk operations

### Fetching and moving data

pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m BoolSource

Copies all data from the *source* argument into the *sink* argument. The result indicates if there was any chunk to
copy.

pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

Copies all data from the *source* argument into the *sink* argument, like `pour`

but ignoring the result.

tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source

teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 xSource

getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m xSource

`getAll`

consumes and returns all data generated by the source.

putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m xSource

`putAll`

puts an entire list into its *sink* argument. If the coroutine fed by the *sink* dies, the remainder of
the argument list is returned.

putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m xSource

This method puts a portion of the producer's output into the `Sink`

. The intervening `Coroutine`

computations
suspend up to the `pipe`

invocation that has created the argument sink. The method returns the suffix of the
argument that could not make it into the sink because of the sibling coroutine's death.

getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m ySource

Consumes inputs from the *source* as long as the *parser* accepts it.

getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m ySource

Consumes input from the *source* as long as the *reader* accepts it.

getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m xSource

Consumes values from the *source* as long as each satisfies the predicate, then returns their list.

getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)Source

Consumes values from the *source* until one of them satisfies the predicate or the source is emptied, then returns
the pair of the list of preceding values and maybe the one value that satisfied the predicate. The latter is not
consumed.

pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

Like `pour`

, copies data from the *source* to the *sink*, but only as long as it satisfies the predicate.

pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

Parses the input data using the given parser and copies the results to output.

pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

Like `pour`

, copies data from the *source* to the *sink*, but only as long as it satisfies the predicate.

pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)Source

Like `pour`

, copies data from the *source* to the *sink*, but only until one value satisfies the predicate. That
value is returned rather than copied.

data Reading x py y

data ReadingResult x py y

ResultPart py (Reader x py y) | A part of the result with the reader of more input |

FinalResult y | Final result chunk |

### Stream transformations

markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)]Source

A sink mark-down transformation: the marks get removed off each chunk.

markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a xSource

A sink mark-up transformation: every chunk going into the sink is accompanied by the given value.

mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()Source

`mapMaybeStream`

is to `mapStream`

like `mapMaybe`

is to `map`

.

concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()Source

`concatMapStream`

is to `mapStream`

like `concatMap`

is to `map`

.

mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

Like `mapStream`

except it runs the argument function on whole chunks read from the input.

mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m accSource

Like `mapAccumStream`

except it runs the argument function on whole chunks read from the input.

foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m accSource

mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m accSource

`mapAccumStream`

is similar to `mapAccumL`

except it reads the values from a `Source`

instead of a list
and writes the mapped values into a `Sink`

instead of returning another list.

concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m accSource

`concatMapAccumStream`

is a love child of `concatMapStream`

and `mapAccumStream`

: it threads the accumulator like
the latter, but its argument function returns not a single value, but a list of values to write into the sink.

partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m ()Source

### Monadic stream transformations

mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

`mapMStream`

is similar to `mapM`

. It draws the values from a `Source`

instead of a list, writes the
mapped values to a `Sink`

, and returns a `Coroutine`

.

mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()Source

`mapMStream_`

is similar to `mapM_`

except it draws the values from a `Source`

instead of a list and
works with `Coroutine`

instead of an arbitrary monad.

mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()Source

Like `mapMStream_`

except it runs the argument function on whole chunks read from the input.

filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m accSource

`foldMStream`

is similar to `foldM`

except it draws the values from a `Source`

instead of a list and
works with `Coroutine`

instead of an arbitrary monad.

foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m ()Source

A variant of `foldMStream`

that discards the final result value.

unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m accSource

`unfoldMStream`

is a version of `unfoldr`

that writes the generated values into a `Sink`

instead of
returning a list.

unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()Source

`unmapMStream_`

is opposite of `mapMStream_`

; it takes a `Sink`

instead of a `Source`

argument and writes the
generated values into it.

unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m ()Source

Like `unmapMStream_`

but writing whole chunks of generated data into the argument sink.

zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()Source

`zipWithMStream`

is similar to `zipWithM`

except it draws the values from two `Source`

arguments
instead of two lists, sends the results into a `Sink`

, and works with `Coroutine`

instead of an arbitrary monad.

parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()Source

`parZipWithMStream`

is equivalent to `zipWithMStream`

, but it consumes the two sources in parallel.