scc-0.8: Streaming component combinators

Safe HaskellNone

Control.Concurrent.SCC.Streams

Contents

Description

This module defines Source and Sink types and pipe functions that create them. The method get on Source abstracts away await, and the method put on Sink is a higher-level abstraction of yield. With this arrangement, a single coroutine can yield values to multiple sinks and await values from multiple sources with no need to change the Coroutine functor. The only requirement is that each functor of the sources and sinks the coroutine uses must be an AncestorFunctor of the coroutine's own functor. For example, a coroutine that takes two sources and one sink might be declared like this:

 zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [(x, y)] -> Coroutine d m ()

Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a new Sink to write to and the consumer a new Source to read from, in addition to all the streams they inherit from the current coroutine. The following function, for example, uses the zip coroutine declard above to add together the pairs of values from two Integer sources:

 add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 [Integer] -> Source m a2 [Integer] -> Sink m a3 [Integer] -> Coroutine d m ()
 add source1 source2 sink = do pipe
                                  (pairSink-> zip source1 source2 pairSink)                         -- producer
                                  (pairSource-> mapStream (List.map $ uncurry (+)) pairSource sink) -- consumer
                               return ()

Synopsis

Sink and Source types

data Sink m a x Source

A Sink can be used to yield output from any nested Coroutine computation whose functor provably descends from the functor a. It's the write-only end of a communication channel created by pipe.

data Source m a x Source

A Source can be used to read input into any nested Coroutine computation whose functor provably descends from the functor a. It's the read-only end of a communication channel created by pipe.

class (Functor a, Functor d) => AncestorFunctor a d

Class of functors that can be lifted.

Instances

(Functor a, ChildFunctor d, ~ (* -> *) d' (Parent d), AncestorFunctor a d') => AncestorFunctor a d 
Functor a => AncestorFunctor a a 

Sink and Source constructors

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

The pipe function splits the computation into two concurrent parts, producer and consumer. The producer is given a Sink to put values into, and consumer a Source to get those values from. Once producer and consumer both complete, pipe returns their paired results.

pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

The pipeP function is equivalent to pipe, except it runs the producer and the consumer in parallel.

pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source

A generic version of pipe. The first argument is used to combine two computation steps.

nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a xSource

A disconnected sink that consumes and ignores all data put into it.

Operations on sinks and sources

Singleton operations

get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)Source

Function get tries to get a single value from the given Source argument. The intervening Coroutine computations suspend all the way to the pipe function invocation that created the source. The function returns Nothing if the argument source is empty.

getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()Source

Invokes its first argument with the value it gets from the source, if there is any to get.

getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m xSource

Tries to get a minimal, i.e., prime, prefix from the given Source argument. The intervening Coroutine computations suspend all the way to the pipe function invocation that created the source. The function returns mempty if the argument source is empty.

peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)Source

Function peek acts the same way as get, but doesn't actually consume the value from the source; sequential calls to peek will always return the same value.

put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m ()Source

This function puts a value into the given Sink. The intervening Coroutine computations suspend up to the pipe invocation that has created the argument sink.

tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m BoolSource

Like put, but returns a Bool that determines if the sink is still active.

Lifting functions

liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d xSource

Converts a Sink on the ancestor functor a into a sink on the descendant functor d.

liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d xSource

Converts a Source on the ancestor functor a into a source on the descendant functor d.

Bulk operations

Fetching and moving data

pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m BoolSource

Copies all data from the source argument into the sink argument. The result indicates if there was any chunk to copy.

pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

Copies all data from the source argument into the sink argument, like pour but ignoring the result.

tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source

tee is similar to pour except it distributes every input value from its source argument into its both sink arguments.

teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 xSource

Every value put into a teeSink result sink goes into its both argument sinks: put (teeSink s1 s2) x is equivalent to put s1 x >> put s2 x. The putChunk method returns the list of values that couldn't fit into the second sink.

getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m xSource

getAll consumes and returns all data generated by the source.

putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m xSource

putAll puts an entire list into its sink argument. If the coroutine fed by the sink dies, the remainder of the argument list is returned.

putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m xSource

This method puts a portion of the producer's output into the Sink. The intervening Coroutine computations suspend up to the pipe invocation that has created the argument sink. The method returns the suffix of the argument that could not make it into the sink because of the sibling coroutine's death.

getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m ySource

Consumes inputs from the source as long as the parser accepts it.

getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m ySource

Consumes input from the source as long as the reader accepts it.

getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m xSource

Consumes values from the source as long as each satisfies the predicate, then returns their list.

getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)Source

Consumes values from the source until one of them satisfies the predicate or the source is emptied, then returns the pair of the list of preceding values and maybe the one value that satisfied the predicate. The latter is not consumed.

pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

Like pour, copies data from the source to the sink, but only as long as it satisfies the predicate.

pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

Parses the input data using the given parser and copies the results to output.

pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

Like pour, copies data from the source to the sink, but only as long as it satisfies the predicate.

pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)Source

Like pour, copies data from the source to the sink, but only until one value satisfies the predicate. That value is returned rather than copied.

type Reader x py y = x -> Reading x py y

data Reading x py y

Constructors

Final x y

Final result chunk with the unconsumed portion of the input

Advance (Reader x py y) y py

A part of the result with the reader of more input and the EOF

Deferred (Reader x py y) y

Reader of more input, plus the result if there isn't any.

data ReadingResult x py y

Constructors

ResultPart py (Reader x py y)

A part of the result with the reader of more input

FinalResult y

Final result chunk

Stream transformations

markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)]Source

A sink mark-down transformation: the marks get removed off each chunk.

markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a xSource

A sink mark-up transformation: every chunk going into the sink is accompanied by the given value.

mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a [y] -> Sink m a [x]Source

An equivalent of map that works on a Sink instead of a list. The argument function is applied to every value vefore it's written to the sink argument.

mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

mapStream is like pour that applies the function f to each argument before passing it into the sink.

mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()Source

concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()Source

mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

Like mapStream except it runs the argument function on whole chunks read from the input.

mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m accSource

Like mapAccumStream except it runs the argument function on whole chunks read from the input.

foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m accSource

Similar to foldl, but reads the values from a Source instead of a list.

mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m accSource

mapAccumStream is similar to mapAccumL except it reads the values from a Source instead of a list and writes the mapped values into a Sink instead of returning another list.

concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m accSource

concatMapAccumStream is a love child of concatMapStream and mapAccumStream: it threads the accumulator like the latter, but its argument function returns not a single value, but a list of values to write into the sink.

partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m ()Source

Equivalent to partition. Takes a Source instead of a list argument and partitions its contents into the two Sink arguments.

Monadic stream transformations

mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source

mapMStream is similar to mapM. It draws the values from a Source instead of a list, writes the mapped values to a Sink, and returns a Coroutine.

mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()Source

mapMStream_ is similar to mapM_ except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.

mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()Source

Like mapMStream_ except it runs the argument function on whole chunks read from the input.

filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source

An equivalent of filterM. Draws the values from a Source instead of a list, writes the filtered values to a Sink, and returns a Coroutine.

foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m accSource

foldMStream is similar to foldM except it draws the values from a Source instead of a list and works with Coroutine instead of an arbitrary monad.

foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m ()Source

A variant of foldMStream that discards the final result value.

unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m accSource

unfoldMStream is a version of unfoldr that writes the generated values into a Sink instead of returning a list.

unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()Source

unmapMStream_ is opposite of mapMStream_; it takes a Sink instead of a Source argument and writes the generated values into it.

unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m ()Source

Like unmapMStream_ but writing whole chunks of generated data into the argument sink.

zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()Source

zipWithMStream is similar to zipWithM except it draws the values from two Source arguments instead of two lists, sends the results into a Sink, and works with Coroutine instead of an arbitrary monad.

parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()Source

parZipWithMStream is equivalent to zipWithMStream, but it consumes the two sources in parallel.