scc-0.4: Streaming component combinatorsSource codeContentsIndex
Control.Concurrent.SCC.Streams
Contents
Sink and Source types
Various pipe functions
Utility functions
Description

This module defines Source and Sink types and pipe functions that create them. The method get on Source abstracts away Control.Concurrent.SCC.Coroutine.await, and the method put on Sink is a higher-level abstraction of Control.Concurrent.SCC.Coroutine.yield. With this arrangement, a single coroutine can yield values to multiple sinks and await values from multiple sources with no need to change the Control.Concurrent.SCC.Coroutine.Coroutine functor; the only requirement is for each funtor of the sources and sinks the coroutine uses to be an Control.Concurrent.SCC.Coroutine.AncestorFunctor of the coroutine's functor. For example, coroutine zip that takes two sources and one sink would be declared like this:

 zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()

Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a new Sink to write to and the consumer a new Source to read from, in addition to all the streams that are visible in the original coroutine. The following function, for example, uses the zip coroutine above to add together the values from two Integer sources:

 add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
        => Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m ()
 add source1 source2 sink = do pipe
                                  (pairSink-> zip source1 source2 pairSink)            -- producer coroutine
                                  (pairSource-> pourMap (uncurry (+)) pairSource sink) -- consumer coroutine
                               return ()
Synopsis
data Sink m a x
data Source m a x
type SinkFunctor a x = EitherFunctor a (TryYield x)
type SourceFunctor a x = EitherFunctor a (Await (Maybe x))
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipePS :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
get' :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m x
getSuccess :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
consumeAndSuppress :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m ()
tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourMap :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m [x]
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m [x]
cond :: a -> a -> Bool -> a
whenNull :: forall a m. Monad m => m [a] -> [a] -> m [a]
Sink and Source types
data Sink m a x Source
A Sink can be used to yield values from any nested Coroutine computation whose functor provably descends from the functor a. It's the write-only end of a Pipe communication channel.
data Source m a x Source
A Source can be used to read values into any nested Coroutine computation whose functor provably descends from the functor a. It's the read-only end of a Pipe communication channel.
type SinkFunctor a x = EitherFunctor a (TryYield x)Source
type SourceFunctor a x = EitherFunctor a (Await (Maybe x))Source
Various pipe functions
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
The pipe function splits the computation into two concurrent parts, producer and consumer. The producer is given a Sink to put values into, and consumer a Source to get those values from. Once producer and consumer both complete, pipe returns their paired results.
pipeP :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
The pipeP function is equivalent to pipe, except the producer and consumer are run in parallel.
pipePS :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)Source
The pipePS function acts either as pipeP or as pipe, depending on the argument parallel.
Utility functions
get' :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m xSource
Function get' assumes that the argument source is not empty and returns the value the source yields. If the source is empty, the function throws an error.
getSuccessSource
:: forall m a d x . (Monad m, AncestorFunctor a d)
=> Source m a x
-> x -> Coroutine d m ()Success continuation
-> Coroutine d m ()
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d xSource
Converts a Sink on the ancestor functor a into a sink on the descendant functor d.
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d xSource
Converts a Source on the ancestor functor a into a source on the descendant functor d.
consumeAndSuppress :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m ()Source
consumeAndSuppress consumes the entire source ignoring the values it generates.
tee :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()Source
tee is similar to pour except it distributes every input value from the source arguments into both sink1 and sink2.
pour :: forall m a1 a2 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()Source
pour copies all data from the source argument into the sink argument, as long as there is anything to copy and the sink accepts it.
pourMap :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()Source
pourMap is like pour that applies the function f to each argument before passing it into the sink.
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]Source
getList returns the list of all values generated by the source.
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m [x]Source
putList puts entire list into its sink argument, as long as the sink accepts it. The remainder that wasn't accepted by the sink is the result value.
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m [x]Source
Like putList, except it puts the contents of the given Seq into the sink.
cond :: a -> a -> Bool -> aSource
A utility function wrapping if-then-else, useful for handling monadic truth values
whenNull :: forall a m. Monad m => m [a] -> [a] -> m [a]Source
A utility function, useful for handling monadic list values where empty list means success
Produced by Haddock version 2.6.0