Safe Haskell | Safe |
---|---|
Language | Haskell98 |
- data Pipe a b m r
- data C
- type Producer b = Pipe () b
- type Consumer b = Pipe b C
- type Pipeline = Pipe () C
- await :: Pipe a b m a
- yield :: b -> Pipe a b m ()
- pipe :: Monad m => (a -> b) -> Pipe a b m r
- (<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r
- (>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
- idP :: Monad m => Pipe a a m r
- newtype PipeC m r a b = PipeC {}
- runPipe :: Monad m => Pipeline m r -> m r
Types
The Pipe
type is strongly inspired by Mario Blazevic's Coroutine
type in
his concurrency article from Issue 19 of The Monad Reader.
The base type for pipes
a
- The type of input received from upstream pipesb
- The type of output delivered to downstream pipesm
- The base monadr
- The type of the return value
Create Pipes
yield
and await
are the only two primitives you need to create pipes.
Since Pipe a b m
is a monad, you can assemble yield
and await
statements using ordinary do
notation. Since Pipe a b
is also a monad
transformer, you can use lift
to invoke the base monad. For example, you
could write a pipe stage that requests permission before forwarding any
output:
check :: (Show a) => Pipe a a IO r check = forever $ do x <- await lift $ putStrLn $ "Can '" ++ (show x) ++ "' pass?" ok <- read <$> lift getLine when ok (yield x)
await :: Pipe a b m a Source #
Wait for input from upstream.
await
blocks until input is available from upstream.
pipe :: Monad m => (a -> b) -> Pipe a b m r Source #
Convert a pure function into a pipe
pipe f = forever $ do x <- await yield (f x)
Compose Pipes
Pipe
s form a Category
, meaning that you can compose Pipe
s using
(<+<
) and also define an identity Pipe
: idP
. These satisfy the
category laws:
idP <+< p = p p <+< idP = p (p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)
Pipe
composition binds the output of the upstream Pipe
to the input of
the downstream Pipe
. Like Haskell functions, Pipe
s are lazy, meaning
that upstream Pipe
s are only evaluated as far as necessary to generate
enough input for downstream Pipe
s. If any Pipe
terminates, it also
terminates every Pipe
composed with it.
(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r infixl 9 Source #
Corresponds to (>>>
) from Control.Category
Run Pipes
Note that you can also unwrap a Pipe
a single step at a time using
runFreeT
(since Pipe
is just a type synonym for a free monad
transformer). This will take you to the next external await
or yield
statement. This means that a closed Pipeline
will unwrap to a single
step, in which case you would have been better served by runPipe
.
runPipe :: Monad m => Pipeline m r -> m r Source #
Run the Pipe
monad transformer, converting it back into the base monad.
runPipe
imposes two conditions:
- The pipe's input, if any, is trivially satisfiable (i.e.
()
) - The pipe does not
yield
any output
The latter restriction makes runPipe
less polymorphic than it could be,
and I settled on the restriction for three reasons:
- It prevents against accidental data loss.
- It protects against silent failures
- It prevents wastefully draining a scarce resource by gratuitously driving it to completion
If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:
runPipe $ forever await <+< p