Safe Haskell | None |
---|---|

Language | Haskell2010 |

This module defines `Source`

and `Sink`

types and `pipe`

functions that create them. The method `get`

on `Source`

abstracts away `await`

, and the method `put`

on `Sink`

is a higher-level
abstraction of `yield`

. With this arrangement, a single coroutine can
yield values to multiple sinks and await values from multiple sources with no need to change the
`Coroutine`

functor. The only requirement is that each functor of the sources and sinks the
coroutine uses must be an `AncestorFunctor`

of the coroutine's own functor. For
example, a coroutine that takes two sources and one sink might be declared like this:

zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [(x, y)] -> Coroutine d m ()

Sources, sinks, and coroutines communicating through them are all created using the `pipe`

function or one of its
variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
new `Sink`

to write to and the consumer a new `Source`

to read from, in addition to all the streams they inherit from
the current coroutine. The following function, for example, uses the *zip* coroutine declard above to add together
the pairs of values from two Integer sources:

add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 [Integer] -> Source m a2 [Integer] -> Sink m a3 [Integer] -> Coroutine d m () add source1 source2 sink = do pipe (pairSink-> zip source1 source2 pairSink) -- producer (pairSource-> mapStream (List.map $ uncurry (+)) pairSource sink) -- consumer return ()

- data Sink m a x
- data Source m a x
- type SinkFunctor a x = Sum a (Request x x)
- type SourceFunctor a x = Sum a (ReadRequest x)
- class (Functor a, Functor d) => AncestorFunctor a d
- pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x
- get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
- getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()
- getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
- peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
- put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m ()
- tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool
- liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
- liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
- pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
- pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
- teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
- getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
- putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x
- putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m x
- getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m y
- getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m y
- getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m x
- getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)
- pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
- type Reader x py y = x -> Reading x py y
- data Reading x py y :: * -> * -> * -> *
- data ReadingResult x py y :: * -> * -> * -> *
- = ResultPart py (Reader x py y)
- | FinalResult y

- markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)]
- markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x
- mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a [y] -> Sink m a [x]
- mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()
- concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
- mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
- foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
- mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
- concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
- partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m ()
- mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
- mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
- filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc
- foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m ()
- unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc
- unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()
- unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m ()
- zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()
- parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()

# Sink and Source types

type SinkFunctor a x = Sum a (Request x x) Source #

type SourceFunctor a x = Sum a (ReadRequest x) Source #

class (Functor a, Functor d) => AncestorFunctor a d #

Class of functors that can be lifted.

(Functor a, ChildFunctor d, (~) (* -> *) d' (Parent d), AncestorFunctor a d') => AncestorFunctor a d | |

Functor a => AncestorFunctor a a | |

# Sink and Source constructors

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source #

pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source #

pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source #

A generic version of `pipe`

. The first argument is used to combine two computation steps.

nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x Source #

A disconnected sink that consumes and ignores all data `put`

into it.

# Operations on sinks and sources

## Singleton operations

get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x) Source #

getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m () Source #

Invokes its first argument with the value it gets from the source, if there is any to get.

getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x Source #

peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x) Source #

put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m () Source #

tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool Source #

Like `put`

, but returns a Bool that determines if the sink is still active.

## Lifting functions

liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x Source #

Converts a `Sink`

on the ancestor functor *a* into a sink on the descendant functor *d*.

liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x Source #

Converts a `Source`

on the ancestor functor *a* into a source on the descendant functor *d*.

## Bulk operations

### Fetching and moving data

pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m Bool Source #

Copies all data from the *source* argument into the *sink* argument. The result indicates if there was any chunk to
copy.

pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m () Source #

Copies all data from the *source* argument into the *sink* argument, like `pour`

but ignoring the result.

tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m () Source #

teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x Source #

getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x Source #

`getAll`

consumes and returns all data generated by the source.

putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x Source #

`putAll`

puts an entire list into its *sink* argument. If the coroutine fed by the *sink* dies, the remainder of
the argument list is returned.

putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m x Source #

This method puts a portion of the producer's output into the `Sink`

. The intervening `Coroutine`

computations
suspend up to the `pipe`

invocation that has created the argument sink. The method returns the suffix of the
argument that could not make it into the sink because of the sibling coroutine's death.

getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m y Source #

Consumes inputs from the *source* as long as the *parser* accepts it.

getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m y Source #

Consumes input from the *source* as long as the *reader* accepts it.

getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m x Source #

Consumes values from the *source* as long as each satisfies the predicate, then returns their list.

getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x) Source #

Consumes values from the *source* until one of them satisfies the predicate or the source is emptied, then returns
the pair of the list of preceding values and maybe the one value that satisfied the predicate. The latter is not
consumed.

pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

Like `pour`

, copies data from the *source* to the *sink*, but only as long as it satisfies the predicate.

pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

Parses the input data using the given parser and copies the results to output.

pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m () Source #

Like `pour`

, copies data from the *source* to the *sink*, but only as long as it satisfies the predicate.

pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x) Source #

Like `pour`

, copies data from the *source* to the *sink*, but only until one value satisfies the predicate. That
value is returned rather than copied.

data ReadingResult x py y :: * -> * -> * -> * #

ResultPart py (Reader x py y) | A part of the result with the reader of more input |

FinalResult y | Final result chunk |

### Stream transformations

markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)] Source #

A sink mark-down transformation: the marks get removed off each chunk.

markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x Source #

A sink mark-up transformation: every chunk going into the sink is accompanied by the given value.

mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m () Source #

`mapMaybeStream`

is to `mapStream`

like `mapMaybe`

is to `map`

.

concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m () Source #

`concatMapStream`

is to `mapStream`

like `concatMap`

is to `map`

.

mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

Like `mapStream`

except it runs the argument function on whole chunks read from the input.

mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc Source #

Like `mapAccumStream`

except it runs the argument function on whole chunks read from the input.

foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc Source #

mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc Source #

`mapAccumStream`

is similar to `mapAccumL`

except it reads the values from a `Source`

instead of a list
and writes the mapped values into a `Sink`

instead of returning another list.

concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc Source #

`concatMapAccumStream`

is a love child of `concatMapStream`

and `mapAccumStream`

: it threads the accumulator like
the latter, but its argument function returns not a single value, but a list of values to write into the sink.

partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m () Source #

### Monadic stream transformations

mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

`mapMStream`

is similar to `mapM`

. It draws the values from a `Source`

instead of a list, writes the
mapped values to a `Sink`

, and returns a `Coroutine`

.

mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m () Source #

`mapMStream_`

is similar to `mapM_`

except it draws the values from a `Source`

instead of a list and
works with `Coroutine`

instead of an arbitrary monad.

mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m () Source #

Like `mapMStream_`

except it runs the argument function on whole chunks read from the input.

filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m () Source #

foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc Source #

`foldMStream`

is similar to `foldM`

except it draws the values from a `Source`

instead of a list and
works with `Coroutine`

instead of an arbitrary monad.

foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m () Source #

A variant of `foldMStream`

that discards the final result value.

unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc Source #

`unfoldMStream`

is a version of `unfoldr`

that writes the generated values into a `Sink`

instead of
returning a list.

unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m () Source #

`unmapMStream_`

is opposite of `mapMStream_`

; it takes a `Sink`

instead of a `Source`

argument and writes the
generated values into it.

unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m () Source #

Like `unmapMStream_`

but writing whole chunks of generated data into the argument sink.

zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m () Source #

`zipWithMStream`

is similar to `zipWithM`

except it draws the values from two `Source`

arguments
instead of two lists, sends the results into a `Sink`

, and works with `Coroutine`

instead of an arbitrary monad.

parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m () Source #

`parZipWithMStream`

is equivalent to `zipWithMStream`

, but it consumes the two sources in parallel.