Copyright | (c) Justin Le 2019 |
---|---|
License | BSD3 |
Maintainer | justin@jle.im |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell2010 |
Base API for Pipe
. See documentation for Pipe
, .|
, and runPipe
for information on usage.
A "prelude" of useful pipes can be found in Data.Conduino.Combinators.
Why a stream processing library?
A stream processing library is a way to stream processors in a composable way: instead of defining your entire stream processing function as a single recursive loop with some global state, instead think about each "stage" of the process, and isolate each state to its own segment. Each component can contain its own isolated state:
>>>
runPipePure $ sourceList [1..10]
.| scan (+) 0 .| sinkList [1,3,6,10,15,21,28,36,45,55]
All of these components have internal "state":
sourceList
keeps track of "which" item in the list to yield nextscan
keeps track of the current running sumsinkList
keeps track of all items that have been seen so far, as a list
They all work together without knowing any other component's internal state, so you can write your total streaming function without concerning yourself, at each stage, with the entire part.
In addition, there are useful functions to "combine" stream processors:
zipSink
combines sinks in an "and" sort of way: combine two sinks in parallel and finish when all finish.altSink
combines sinks in an "or" sort of way: combine two sinks in parallel and finish when any of them finishzipSource
combines sources in parallel and collate their outputs.
Stream processing libraries are also useful for streaming composition of monadic effects (like IO or State), as well.
Synopsis
- data Pipe i o u m a
- (.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
- runPipe :: Monad m => Pipe () Void u m a -> m a
- runPipePure :: Pipe () Void Void Identity a -> a
- awaitEither :: Pipe i o u m (Either u i)
- await :: Pipe i o u m (Maybe i)
- awaitWith :: (i -> Pipe i o u m u) -> Pipe i o u m u
- awaitSurely :: Pipe i o Void m i
- awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u
- yield :: o -> Pipe i o u m ()
- (&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
- (|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
- fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
- fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
- fuseBothMaybe :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (Maybe v, r)
- squeezePipe :: Monad m => Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) a)
- squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a)
- feedPipe :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) ([i], a))
- feedPipeEither :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a))
- mapInput :: (i -> j) -> Pipe j o u m a -> Pipe i o u m a
- mapOutput :: (p -> o) -> Pipe i p u m a -> Pipe i o u m a
- mapUpRes :: (u -> v) -> Pipe i o v m a -> Pipe i o u m a
- trimapPipe :: (i -> j) -> (p -> o) -> (u -> v) -> Pipe j p v m a -> Pipe i o u m a
- hoistPipe :: (Monad m, Monad n) => (forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a
- feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a
- newtype ZipSource m a = ZipSource {
- getZipSource :: Pipe () a Void m ()
- unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a))
- zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m ()
- newtype ZipSink i u m a = ZipSink {
- getZipSink :: Pipe i Void u m a
- zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b
- altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a
- toListT :: Applicative m => Pipe () o u m () -> ListT m (Maybe o)
- fromListT :: Monad m => ListT m (Maybe o) -> Pipe i o u m ()
- pattern PipeList :: Monad m => ListT m (Maybe a) -> Pipe () a u m ()
- withSource :: Pipe () o u m () -> (Maybe (o, m r) -> m r) -> m r
- genSource :: (forall r. (Maybe (o, m r) -> m r) -> m r) -> Pipe i o u m ()
Documentation
Similar to a conduit from the conduit package.
For a
, you have:Pipe
i o u m a
i
: Type of input stream (the things you canawait
)o
: Type of output stream (the things youyield
)u
: Type of the result of the upstream pipe (Outputted when upstream pipe terminates)m
: Underlying monad (the things you canlift
)a
: Result type when pipe terminates (outputted when finished, withpure
orreturn
)
Some specializations:
- If
i
is()
, the pipe is a source --- it doesn't need anything to produce items. It will pump out items on its own, for pipes downstream to receive and process. - If
o
isVoid
, the pipe is a sink --- it will neveryield
anything downstream. It will consume items from things upstream, and produce a result (a
) if and when it terminates. - If
u
isVoid
, then the pipe's upstream is limitless, and never terminates. This means that you can useawaitSurely
instead ofawait
, to get await a value that is guaranteed to come. You'll get ani
instead of a
.Maybe
i - If
a
isVoid
, then the pipe never terminates --- it will keep on consuming and/or producing values forever. If this is a sink, it means that the sink will never terminate, and sorunPipe
will also never terminate. If it is a source, it means that if you chain something downstream with.|
, that downstream pipe can useawaitSurely
to guarantee something being passed down.
Applicative and Monadic sequencing of pipes chains by exhaustion.
do pipeX pipeY pipeZ
is a pipe itself, that behaves like pipeX
until it terminates, then
pipeY
until it terminates, then pipeZ
until it terminates. The
Monad
instance allows you to choose "which pipe to behave like next"
based on the terminating result of a previous pipe.
do x <- pipeX pipeBasedOn x
Usually you would use it by chaining together pipes with
.|
and then running the result with
runPipe
.
runPipe
$ someSource.|
somePipe .| someOtherPipe .| someSink
See .|
and runPipe
for more information
on usage.
For a "prelude" of commonly used Pipe
s, see
Data.Conduino.Combinators.
Instances
(.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r infixr 2 Source #
The main operator for chaining pipes together. pipe1 .| pipe2
will
connect the output of pipe1
to the input of pipe2
.
Running a pipe will draw from pipe2
, and if pipe2
ever asks for
input (with await
or something similar), it will block until pipe1
outputs something (or signals termination).
The structure of a full pipeline usually looks like:
runPipe
$ someSource.|
somePipe .| someOtherPipe .| someSink
Where you route a source into a series of pipes, which eventually ends
up at a sink. runPipe
will then produce the result of that sink.
runPipe :: Monad m => Pipe () Void u m a -> m a Source #
Run a pipe that is both a source and a sink (an "effect") into the effect that it represents.
Usually you wouild construct this using something like:
runPipe
$ someSource.|
somePipe .| someOtherPipe .| someSink
runPipe
will produce the result of that final sink.
Some common errors you might receive:
i
is not()
: If you give a pipe where the first parameter ("input") is not()
, it means that your pipe is not a producer. Pre-compose it (using.|
) with a producer of the type you need.
For example, if you have a myPipe ::
, this is
a pipe that is awaiting Pipe
Int
o u m aInt
s from upstream. Pre-compose with
a producer of Int
s, like
, in order to be able to run it.sourceList
[1,2,3] .|
myPipe
o
is notVoid
: If you give a pipe where the second parameter ("output") is notVoid
, it means that your pipe is not a consumer. Post-compose it (using.|
) with a consumer of the type you need.
For example, if you have myPipe ::
, this is
a pipe that is yielding Pipe
i Int
u m aInt
s downstream that are going unhandled.
Post-compose it a consumer of Int
s, like myPipe
, in order to be able to run it..|
foldl
(+) 0
If you just want to ignore all downstream yields, post-compose with
sinkNull
.
Primitives
awaitEither :: Pipe i o u m (Either u i) Source #
Await on upstream output. Will block until it receives an i
(expected input type) or a u
if the upstream pipe terminates.
await :: Pipe i o u m (Maybe i) Source #
Await input from upstream. Will block until upstream yield
s.
Will return Nothing
if the upstream pipe finishes and terminates.
If the upstream pipe never terminates, then you can use awaitSurely
to
guarantee a result.
awaitSurely :: Pipe i o Void m i Source #
Await input from upstream where the upstream pipe is guaranteed to never terminate.
A common type error will occur if u
(upstream pipe result type) is not
Void
-- it might be ()
or some non-Void
type. This means that the
upstream pipe terminates, so awaiting cannot be assured.
In that case, either change your upstream pipe to be one that never
terminates (which is most likely not possible), or use await
instead
of awaitSurely
.
awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u Source #
A useful utility function over repeated await
s. Will repeatedly
await
and then continue with the given pipe whenever the upstream pipe
yields.
Can be used to implement many pipe combinators:
map
f =awaitForever
$ x ->yield
(f x)
Special chaining
(&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) infixr 2 Source #
Like .|
, but get the result of both pipes on termination, instead
of just the second. This means that p &| q
will only terminate with a result when
both p
and q
terminate. (Typically, p .| q
would terminate as soon as
q
terminates.)
Since: 0.2.1.0
(|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v infixr 2 Source #
Like .|
, but keep the result of the first pipe, instead of the
second. This means that p |. q
will only terminate with a result when
both p
and q
terminate. (Typically, p .| q
would terminate as soon as
q
terminates.)
Since: 0.2.1.0
fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) Source #
Useful prefix version of &|
.
Since: 0.2.1.0
fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v Source #
Useful prefix version of |.
.
Since: 0.2.1.0
Incremental running
squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a) Source #
Squeeze a pipe by extracting all output that can be extracted before
any input is requested. Returns a Left
if the pipe eventually does
request input (as a continuation on the new input, or a terminating u
value), or a Right
if the pipe terminates with a value before ever
asking for input.
Since: 0.2.1.0
feedPipe :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) ([i], a)) Source #
Repeatedly run squeezePipe
by giving it items from an input list.
Returns the outputs observed, and Left
if the input list was exhausted
with more input expected, or Right
if the pipe terminated, with the
leftover inputs and output result.
Since: 0.2.1.0
feedPipeEither :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a)) Source #
Repeatedly run squeezePipeEither
by giving it items from an input
list. Returns the outputs observed, and Left
if the input list was
exhausted with more input expected (or a u
terminating upstream
value), or Right
if the pipe terminated, with the leftover inputs and
output result.
Since: 0.2.1.0
Pipe transformers
mapInput :: (i -> j) -> Pipe j o u m a -> Pipe i o u m a Source #
(Contravariantly) map over the expected input type.
mapOutput :: (p -> o) -> Pipe i p u m a -> Pipe i o u m a Source #
Map over the downstream output type.
If you want to map over the result type, use fmap
.
mapUpRes :: (u -> v) -> Pipe i o v m a -> Pipe i o u m a Source #
(Contravariantly) map over the upstream result type.
trimapPipe :: (i -> j) -> (p -> o) -> (u -> v) -> Pipe j p v m a -> Pipe i o u m a Source #
Map over the input type, output type, and upstream result type.
If you want to map over the result type, use fmap
.
hoistPipe :: (Monad m, Monad n) => (forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a Source #
Transform the underlying monad of a pipe.
Note that if you are trying to work with monad transformers, this is probably not what you want. See Data.Conduino.Lift for tools for working with underlying monad transformers.
feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a Source #
Loop a pipe into itself.
- Will feed all output back to the input
- Will only ask for input upstream if output is stalled.
- Yields all outputted values downstream, effectively duplicating them.
Since: 0.2.1.0
Wrappers
newtype ZipSource m a Source #
A newtype wrapper over a source (
) that gives it an
alternative Pipe
() o Void
Applicative
and Alternative
instance, matching "ListT
done right".
<*>
will pair up each output that the sources produce: if you await
a value from downstream, it will wait until both paired sources yield
before passing them on together.
<|>
will completely exhaust the first source before moving on to the
next source.
ZipSource
is effectively equivalent to "ListT done right", the true
List Monad transformer. <|>
is concatentation. You can use this type
with lift
to lift a yielding action and <|>
to sequence yields to
implement the pattern described in
http://www.haskellforall.com/2014/11/how-to-build-library-agnostic-streaming.html,
where you can write streaming producers in a polymorphic way, and have
it run with pipes, conduit, etc.
The main difference is that its Applicative
instance ("zipping") is
different from the traditional Applicative
instance for ListT
("all combinations"). Effectively this becomes like a "zipping"
Applicative
instance for ListT
.
If you want a Monad
(or MonadIO
) instance,
use ListT
instead, and convert using 'toListT'/'fromListT' or the
PipeList
pattern/constructor.
ZipSource | |
|
Instances
MonadTrans ZipSource Source # | |
Defined in Data.Conduino | |
Functor (ZipSource m) Source # | |
Monad m => Applicative (ZipSource m) Source # | |
Defined in Data.Conduino | |
Monad m => Alternative (ZipSource m) Source # | |
unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a)) Source #
ZipSource
is effectively ListT
returning a Maybe
. As such, you
can use unconsZipSource
to "peel off" the first yielded item, if it
exists, and return the "rest of the list".
zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m () Source #
Takes two sources and runs them in parallel, collating their outputs.
Since: 0.2.1.0
newtype ZipSink i u m a Source #
A newtype wrapper over a sink (
) that gives it an
alternative Pipe
i Void
Applicative
and Alternative
instance.
<*>
will distribute input over both sinks, and output a final result
once both sinks finish.
<|>
will distribute input over both sinks, and output a final result
as soon as one or the other finishes.
ZipSink | |
|
Instances
MonadTrans (ZipSink i u) Source # | |
Defined in Data.Conduino | |
Functor (ZipSink i u m) Source # | |
Monad m => Applicative (ZipSink i u m) Source # |
|
Defined in Data.Conduino pure :: a -> ZipSink i u m a # (<*>) :: ZipSink i u m (a -> b) -> ZipSink i u m a -> ZipSink i u m b # liftA2 :: (a -> b -> c) -> ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m c # (*>) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m b # (<*) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m a # | |
Monad m => Alternative (ZipSink i u m) Source # |
|
zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b Source #
Distribute input to both sinks, and finishes with the final result once both finish.
Forms an identity with pure
.
altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a Source #
Distribute input to both sinks, and finishes with the result of the one that finishes first.