|
Control.Concurrent.SCC.Streams |
|
|
|
|
Description |
This module defines Source and Sink types and pipe functions that create them. The method get on Source
abstracts away Control.Concurrent.Coroutine.await, and the method put on Sink is a higher-level abstraction of
Control.Concurrent.Coroutine.SuspensionFunctors.yield. With this arrangement, a single coroutine can yield values
to multiple sinks and await values from multiple sources with no need to change the
Control.Concurrent.Coroutine.Coroutine functor; the only requirement is for each funtor of the sources and sinks
the coroutine uses to be an Control.Concurrent.Coroutine.AncestorFunctor of the coroutine's functor. For example,
coroutine zip that takes two sources and one sink would be declared like this:
zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()
Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its
variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
new Sink to write to and the consumer a new Source to read from, in addition to all the streams that are visible
in the original coroutine. The following function, for example, uses the zip coroutine above to add together the
values from two Integer sources:
add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m ()
add source1 source2 sink = do pipe
(pairSink-> zip source1 source2 pairSink) -- producer coroutine
(pairSource-> mapStream (uncurry (+)) pairSource sink) -- consumer coroutine
return ()
|
|
Synopsis |
|
data Sink m a x | | data Source m a x | | type SinkFunctor a x = EitherFunctor a (Yield x) | | type SourceFunctor a x = EitherFunctor a (Await (Maybe x)) | | class (Functor a, Functor d) => AncestorFunctor a d | | pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) | | pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) | | pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) | | nullSink :: forall m a x. Monad m => Sink m a x | | nullSource :: forall m a x. Monad m => Source m a x | | get :: Source m a x -> forall d. AncestorFunctor a d => Coroutine d m (Maybe x) | | put :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m () | | getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m () | | liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x | | liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x | | pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m () | | tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m () | | teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x | | teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 x | | mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () | | mapSource :: forall m a x y. Monad m => (x -> y) -> Source m a x -> Source m a y | | mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a x | | mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () | | mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a y | | mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a x | | mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m () | | mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () | | mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a x | | mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a y | | filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m () | | filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a x | | filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a x | | foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc | | foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc | | foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m () | | mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc | | partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m () | | unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m acc | | unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m () | | zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m () | | parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m () | | getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x] | | putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m () | | putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m () | | cond :: a -> a -> Bool -> a |
|
|
|
Sink and Source types
|
|
|
A Sink can be used to yield values from any nested Coroutine computation whose functor provably descends from
the functor a. It's the write-only end of a communication channel created by pipe.
|
|
|
|
A Source can be used to read values into any nested Coroutine computation whose functor provably descends from
the functor a. It's the read-only end of a communication channel created by pipe.
|
|
|
|
|
|
|
|
Class of functors that can be lifted.
| | Instances | |
|
|
Sink and Source constructors
|
|
|
The pipe function splits the computation into two concurrent parts, producer and consumer. The producer is
given a Sink to put values into, and consumer a Source to get those values from. Once producer and consumer
both complete, pipe returns their paired results.
|
|
|
The pipeP function is equivalent to pipe, except it runs the producer and the consumer in parallel.
|
|
|
The pipePS function acts either as pipeP or as pipe, depending on the argument parallel.
|
|
|
A disconnected sink that ignores all values put into it.
|
|
|
An empty source whose get always returns Nothing.
|
|
Operations on sinks and sources
|
|
Singleton operations
|
|
|
Function get tries to get a value from the given Source argument. The intervening Coroutine computations
suspend all the way to the pipe function invocation that created the source. The function returns Nothing if
the argument source is empty.
|
|
|
This function puts a value into the given Sink. The intervening Coroutine computations suspend up
to the pipe invocation that has created the argument sink.
|
|
|
Invokes its first argument with the value it gets from the source, if there is any to get.
|
|
Lifting functions
|
|
|
Converts a Sink on the ancestor functor a into a sink on the descendant functor d.
|
|
|
Converts a Source on the ancestor functor a into a source on the descendant functor d.
|
|
Bulk operations
|
|
|
pour copies all data from the source argument into the sink argument.
|
|
|
tee is similar to pour except it distributes every input value from its source argument into its both sink
arguments.
|
|
|
Every value put into a teeSink result sink goes into its both argument sinks: put (teeSink s1 s2) x is
equivalent to put s1 x >> put s2 x.
|
|
|
The Source returned by teeSource writes every value read from its argument source into the argument sink before
providing it back.
|
|
|
mapStream is like pour that applies the function f to each argument before passing it into the sink.
|
|
|
An equivalent of map that works on a Source instead of a list. The argument function is applied to
every value after it's read from the source argument.
|
|
|
An equivalent of map that works on a Sink instead of a list. The argument function is applied to
every value vefore it's written to the sink argument.
|
|
|
mapMStream is similar to mapM. It draws the values from a Source instead of a list, writes the
mapped values to a Sink, and returns a Coroutine.
|
|
|
An equivalent of mapM that works on a Source instead of a list. Similar to mapSource, except
the function argument is monadic and may have perform effects.
|
|
|
An equivalent of mapM that works on a Sink instead of a list. Similar to mapSink, except the
function argument is monadic and may have perform effects.
|
|
|
mapMStream_ is similar to mapM_ except it draws the values from a Source instead of a list and
works with Coroutine instead of an arbitrary monad.
|
|
|
mapMaybeStream is to mapStream like mapMaybe is to map.
|
|
|
mapMaybeSink is to mapSink like mapMaybe is to map.
|
|
|
mapMaybeSource is to mapSource like mapMaybe is to map.
|
|
|
An equivalent of filterM. Draws the values from a Source instead of a list, writes the filtered
values to a Sink, and returns a Coroutine.
|
|
|
An equivalent of filterM; filters a Source instead of a list.
|
|
|
An equivalent of filterM; filters a Sink instead of a list.
|
|
|
Similar to foldl, but reads the values from a Source instead of a list.
|
|
|
foldMStream is similar to foldM except it draws the values from a Source instead of a list and
works with Coroutine instead of an arbitrary monad.
|
|
|
A version of foldMStream that ignores the final result value.
|
|
|
mapAccumStream is similar to mapAccumL except it reads the values from a Source instead of a list
and writes the mapped values into a Sink instead of returning another list.
|
|
|
Equivalent to partition. Takes a Source instead of a list argument and partitions its contents into
the two Sink arguments.
|
|
|
unfoldMStream is a version of unfoldr that writes the generated values into a Sink instead of
returning a list.
|
|
|
unmapMStream_ is opposite of mapMStream_; it takes a Sink instead of a Source argument and writes the
generated values into it.
|
|
|
zipWithMStream is similar to zipWithM except it draws the values from two Source arguments
instead of two lists, sends the results into a Sink, and works with Coroutine instead of an arbitrary monad.
|
|
|
parZipWithMStream is equivalent to zipWithMStream, but it consumes the two sources in parallel.
|
|
|
getList returns the list of all values generated by the source.
|
|
|
putList puts entire list into its sink argument.
|
|
|
Like putList, except it puts the contents of the given Seq into the sink.
|
|
Utility functions
|
|
|
A utility function wrapping if-then-else, useful for handling monadic truth values
|
|
Produced by Haddock version 2.7.2 |