scc-0.8.2.4: Streaming component combinators

Control.Concurrent.SCC.Streams

Description

This module defines Source and Sink types and pipe functions that create them. The method get on Source abstracts away await, and the method put on Sink is a higher-level abstraction of yield. With this arrangement, a single coroutine can yield values to multiple sinks and await values from multiple sources with no need to change the Coroutine functor. The only requirement is that each functor of the sources and sinks the coroutine uses must be an AncestorFunctor of the coroutine's own functor. For example, a coroutine that takes two sources and one sink might be declared like this:

zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [(x, y)] -> Coroutine d m ()


Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a new Sink to write to and the consumer a new Source to read from, in addition to all the streams they inherit from the current coroutine. The following function, for example, uses the zip coroutine declard above to add together the pairs of values from two Integer sources:

add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 [Integer] -> Source m a2 [Integer] -> Sink m a3 [Integer] -> Coroutine d m ()
add source1 source2 sink = do pipe
(pairSink-> zip source1 source2 pairSink)                         -- producer
(pairSource-> mapStream (List.map \$ uncurry (+)) pairSource sink) -- consumer
return ()


Synopsis

# Sink and Source types

data Sink m a x Source #

A Sink can be used to yield output from any nested Coroutine computation whose functor provably descends from the functor a. It's the write-only end of a communication channel created by pipe.

data Source m a x Source #

A Source can be used to read input into any nested Coroutine computation whose functor provably descends from the functor a. It's the read-only end of a communication channel created by pipe.

type SinkFunctor a x = Sum a (Request x x) Source #

class (Functor a, Functor d) => AncestorFunctor a d #

Class of functors that can be lifted.

Minimal complete definition

liftFunctor

Instances

 (Functor a, ChildFunctor d, (~) (* -> *) d' (Parent d), AncestorFunctor a d') => AncestorFunctor a d MethodsliftFunctor :: a x -> d x # Functor a => AncestorFunctor a a MethodsliftFunctor :: a x -> a x #

# Sink and Source constructors

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source #

The pipe function splits the computation into two concurrent parts, producer and consumer. The producer is given a Sink to put values into, and consumer a Source to get those values from. Once producer and consumer both complete, pipe returns their paired results.

pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source #

The pipeP function is equivalent to pipe, except it runs the producer and the consumer in parallel.

pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source #

A generic version of pipe. The first argument is used to combine two computation steps.

nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x Source #

A disconnected sink that consumes and ignores all data put into it.

# Operations on sinks and sources

## Singleton operations

get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x) Source #

Function get tries to get a single value from the given Source argument. The intervening Coroutine computations suspend all the way to the pipe function invocation that created the source. The function returns Nothing if the argument source is empty.

getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m () Source #

Invokes its first argument with the value it gets from the source, if there is any to get.

getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x Source #

Tries to get a minimal, i.e., prime, prefix from the given Source argument. The intervening Coroutine computations suspend all the way to the pipe function invocation that created the source. The function returns mempty if the argument source is empty.

peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x) Source #

Function peek acts the same way as get, but doesn't actually consume the value from the source; sequential calls to peek will always return the same value.

put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m () Source #

This function puts a value into the given Sink. The intervening Coroutine computations suspend up to the pipe invocation that has created the argument sink.

tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool Source #

Like put, but returns a Bool that determines if the sink is still active.

## Lifting functions

liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x Source #

Converts a Sink on the ancestor functor a into a sink on the descendant functor d.

liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x Source #

Converts a Source on the ancestor functor a into a source on the descendant functor d.

## Bulk operations

### Fetching and moving data

pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m Bool Source #

Copies all data from the source argument into the sink argument. The result indicates if there was any chunk to copy.

pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m () Source #

Copies all data from the source argument into the sink argument, like pour but ignoring the result.

tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m () Source #

tee is similar to pour except it distributes every input value from its source argument into its both sink arguments.

teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x Source #

Every value put into a teeSink result sink goes into its both argument sinks: put (teeSink s1 s2) x is equivalent to put s1 x >> put s2 x. The putChunk method returns the list of values that couldn't fit into the second sink.

getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x Source #

getAll consumes and returns all data generated by the source.

putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x Source #

putAll puts an entire list into its sink argument. If the coroutine fed by the sink dies, the remainder of the argument list is returned.

putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m x Source #

This method puts a portion of the producer's output into the Sink. The intervening Coroutine computations suspend up to the pipe invocation that has created the argument sink. The method returns the suffix of the argument that could not make it into the sink because of the sibling coroutine's death.

getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m y Source #

Consumes inputs from the source as long as the parser accepts it.

getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m y Source #

Consumes input from the source as long as the reader accepts it.

getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m x Source #

Consumes values from the source as long as each satisfies the predicate, then returns their list.

getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x) Source #

Consumes values from the source until one of them satisfies the predicate or the source is emptied, then returns the pair of the list of preceding values and maybe the one value that satisfied the predicate. The latter is not consumed.

pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

Like pour, copies data from the source to the sink, but only as long as it satisfies the predicate.

pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

Parses the input data using the given parser and copies the results to output.

pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m () Source #

Like pour, copies data from the source to the sink, but only as long as it satisfies the predicate.

pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x) Source #

Like pour, copies data from the source to the sink, but only until one value satisfies the predicate. That value is returned rather than copied.

type Reader x py y = x -> Reading x py y #

data Reading x py y :: * -> * -> * -> * #

Constructors

 Final x y Final result chunk with the unconsumed portion of the input Advance (Reader x py y) y py A part of the result with the reader of more input and the EOF Deferred (Reader x py y) y Reader of more input, plus the result if there isn't any.

data ReadingResult x py y :: * -> * -> * -> * #

Constructors

 ResultPart py (Reader x py y) A part of the result with the reader of more input FinalResult y Final result chunk

### Stream transformations

markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)] Source #

A sink mark-down transformation: the marks get removed off each chunk.

markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x Source #

A sink mark-up transformation: every chunk going into the sink is accompanied by the given value.

mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a [y] -> Sink m a [x] Source #

An equivalent of map that works on a Sink instead of a list. The argument function is applied to every value vefore it's written to the sink argument.

mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

mapStream is like pour that applies the function f to each argument before passing it into the sink.

mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m () Source #

mapMaybeStream is to mapStream like mapMaybe is to map.

concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m () Source #

concatMapStream is to mapStream like concatMap is to map.

mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

Like mapStream except it runs the argument function on whole chunks read from the input.

mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc Source #

Like mapAccumStream except it runs the argument function on whole chunks read from the input.

foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc Source #

Similar to foldl, but reads the values from a Source instead of a list.

mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc Source #

mapAccumStream is similar to mapAccumL except it reads the values from a Source instead of a list and writes the mapped values into a Sink instead of returning another list.

concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc Source #

concatMapAccumStream is a love child of concatMapStream and mapAccumStream: it threads the accumulator like the latter, but its argument function returns not a single value, but a list of values to write into the sink.

partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m () Source #

Equivalent to partition. Takes a Source instead of a list argument and partitions its contents into the two Sink arguments.

mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source #

mapMStream is similar to mapM. It draws the values from a Source instead of a list, writes the mapped values to a Sink, and returns a Coroutine.

mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m () Source #

mapMStream_ is similar to mapM_ except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.

mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m () Source #

Like mapMStream_ except it runs the argument function on whole chunks read from the input.

filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m () Source #

An equivalent of filterM. Draws the values from a Source instead of a list, writes the filtered values to a Sink, and returns a Coroutine.

foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc Source #

foldMStream is similar to foldM except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.

foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m () Source #

A variant of foldMStream that discards the final result value.

unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc Source #

unfoldMStream is a version of unfoldr that writes the generated values into a Sink instead of returning a list.

unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m () Source #

unmapMStream_ is opposite of mapMStream_; it takes a Sink instead of a Source argument and writes the generated values into it.

unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m () Source #

Like unmapMStream_ but writing whole chunks of generated data into the argument sink.

zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m () Source #

zipWithMStream is similar to zipWithM except it draws the values from two Source arguments instead of two lists, sends the results into a Sink, and works with Coroutine instead of an arbitrary monad.

parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m () Source #

parZipWithMStream is equivalent to zipWithMStream, but it consumes the two sources in parallel.