Control.Concurrent.SCC.Streams
Contents
Description
This module defines Source and Sink types and pipe functions that create them. The method get on Source
abstracts away Control.Concurrent.Coroutine.await, and the method put on Sink is a higher-level abstraction of
Control.Concurrent.Coroutine.SuspensionFunctors.yield. With this arrangement, a single coroutine can yield values
to multiple sinks and await values from multiple sources with no need to change the
Control.Concurrent.Coroutine.Coroutine functor; the only requirement is for each funtor of the sources and sinks
the coroutine uses to be an Control.Concurrent.Coroutine.AncestorFunctor of the coroutine's functor. For example,
coroutine zip that takes two sources and one sink would be declared like this:
zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()
Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its
variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
new Sink to write to and the consumer a new Source to read from, in addition to all the streams that are visible
in the original coroutine. The following function, for example, uses the zip coroutine above to add together the
values from two Integer sources:
add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m ()
add source1 source2 sink = do pipe
(pairSink-> zip source1 source2 pairSink) -- producer coroutine
(pairSource-> mapStream (uncurry (+)) pairSource sink) -- consumer coroutine
return ()
- data Sink m a x
- data Source m a x
- type SinkFunctor a x = EitherFunctor a (Yield x)
- type SourceFunctor a x = EitherFunctor a (Await (Maybe x))
- class (Functor a, Functor d) => AncestorFunctor a d
- pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- nullSink :: forall m a x. Monad m => Sink m a x
- nullSource :: forall m a x. Monad m => Source m a x
- get :: Source m a x -> forall d. AncestorFunctor a d => Coroutine d m (Maybe x)
- put :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m ()
- getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
- liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
- liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
- pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
- teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
- teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 x
- mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapSource :: forall m a x y. Monad m => (x -> y) -> Source m a x -> Source m a y
- mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a x
- mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a y
- mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a x
- mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
- mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a x
- mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a y
- filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a x
- filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a x
- foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
- foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
- foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()
- mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
- partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
- unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m acc
- unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()
- zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
- parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
- getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
- putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()
- putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()
- cond :: a -> a -> Bool -> a
Sink and Source types
type SinkFunctor a x = EitherFunctor a (Yield x)Source
type SourceFunctor a x = EitherFunctor a (Await (Maybe x))Source
class (Functor a, Functor d) => AncestorFunctor a d
Class of functors that can be lifted.
Instances
| (d' ~ Parent d, Functor a, ChildFunctor d, AncestorFunctor a d') => AncestorFunctor a d | |
| Functor a => AncestorFunctor a a |
Sink and Source constructors
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
nullSink :: forall m a x. Monad m => Sink m a xSource
A disconnected sink that ignores all values put into it.
nullSource :: forall m a x. Monad m => Source m a xSource
An empty source whose get always returns Nothing.
Operations on sinks and sources
Singleton operations
getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source
Invokes its first argument with the value it gets from the source, if there is any to get.
Lifting functions
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d xSource
Converts a Sink on the ancestor functor a into a sink on the descendant functor d.
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d xSource
Converts a Source on the ancestor functor a into a source on the descendant functor d.
Bulk operations
pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source
pour copies all data from the source argument into the sink argument.
tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source
teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 xSource
teeSource :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Source m a2 x -> Source m a3 xSource
mapStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source
mapMStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source
mapMStream is similar to mapM. It draws the values from a Source instead of a list, writes the
mapped values to a Sink, and returns a Coroutine.
mapMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a ySource
mapMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a xSource
mapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()Source
mapMStream_ is similar to mapM_ except it draws the values from a Source instead of a list and
works with Coroutine instead of an arbitrary monad.
mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source
mapMaybeStream is to mapStream like mapMaybe is to map.
mapMaybeSink :: forall m a x y. Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a xSource
mapMaybeSink is to mapSink like mapMaybe is to map.
mapMaybeSource :: forall m a x y. Monad m => (x -> Maybe y) -> Source m a x -> Source m a ySource
mapMaybeSource is to mapSource like mapMaybe is to map.
filterMStream :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source
filterMSource :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a xSource
filterMSink :: forall m a x y. Monad m => (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a xSource
foldStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m accSource
foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m accSource
foldMStream is similar to foldM except it draws the values from a Source instead of a list and
works with Coroutine instead of an arbitrary monad.
foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()Source
A version of foldMStream that ignores the final result value.
mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m accSource
mapAccumStream is similar to mapAccumL except it reads the values from a Source instead of a list
and writes the mapped values into a Sink instead of returning another list.
partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source
unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m accSource
unfoldMStream is a version of unfoldr that writes the generated values into a Sink instead of
returning a list.
unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()Source
unmapMStream_ is opposite of mapMStream_; it takes a Sink instead of a Source argument and writes the
generated values into it.
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source
zipWithMStream is similar to zipWithM except it draws the values from two Source arguments
instead of two lists, sends the results into a Sink, and works with Coroutine instead of an arbitrary monad.
parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()Source
parZipWithMStream is equivalent to zipWithMStream, but it consumes the two sources in parallel.
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]Source
getList returns the list of all values generated by the source.
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()Source
putList puts entire list into its sink argument.
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()Source