Safe Haskell  SafeInferred 

 data Pipe a b m r
 data C
 type Producer b = Pipe () b
 type Consumer b = Pipe b C
 type Pipeline = Pipe () C
 await :: Pipe a b m a
 yield :: b > Pipe a b m ()
 pipe :: Monad m => (a > b) > Pipe a b m r
 (<+<) :: Monad m => Pipe b c m r > Pipe a b m r > Pipe a c m r
 (>+>) :: Monad m => Pipe a b m r > Pipe b c m r > Pipe a c m r
 idP :: Monad m => Pipe a a m r
 newtype PipeC m r a b = PipeC {}
 runPipe :: Monad m => Pipeline m r > m r
Types
The Pipe
type is strongly inspired by Mario Blazevic's Coroutine
type in
his concurrency article from Issue 19 of The Monad Reader.
The base type for pipes

a
 The type of input received from upstream pipes 
b
 The type of output delivered to downstream pipes 
m
 The base monad 
r
 The type of the return value
Create Pipes
yield
and await
are the only two primitives you need to create pipes.
Since Pipe a b m
is a monad, you can assemble yield
and await
statements using ordinary do
notation. Since Pipe a b
is also a monad
transformer, you can use lift
to invoke the base monad. For example, you
could write a pipe stage that requests permission before forwarding any
output:
check :: (Show a) => Pipe a a IO r check = forever $ do x < await lift $ putStrLn $ "Can '" ++ (show x) ++ "' pass?" ok < read <$> lift getLine when ok (yield x)
Wait for input from upstream.
await
blocks until input is available from upstream.
pipe :: Monad m => (a > b) > Pipe a b m rSource
Convert a pure function into a pipe
pipe f = forever $ do x < await yield (f x)
Compose Pipes
Pipe
s form a Category
, meaning that you can compose Pipe
s using
(<+<
) and also define an identity Pipe
: idP
. These satisfy the
category laws:
idP <+< p = p p <+< idP = p (p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)
Pipe
composition binds the output of the upstream Pipe
to the input of
the downstream Pipe
. Like Haskell functions, Pipe
s are lazy, meaning
that upstream Pipe
s are only evaluated as far as necessary to generate
enough input for downstream Pipe
s. If any Pipe
terminates, it also
terminates every Pipe
composed with it.
(>+>) :: Monad m => Pipe a b m r > Pipe b c m r > Pipe a c m rSource
Corresponds to (>>>
) from Control.Category
Run Pipes
Note that you can also unwrap a Pipe
a single step at a time using
runFreeT
(since Pipe
is just a type synonym for a free monad
transformer). This will take you to the next external await
or yield
statement. This means that a closed Pipeline
will unwrap to a single
step, in which case you would have been better served by runPipe
.
runPipe :: Monad m => Pipeline m r > m rSource
Run the Pipe
monad transformer, converting it back into the base monad.
runPipe
imposes two conditions:
 The pipe's input, if any, is trivially satisfiable (i.e.
()
)  The pipe does not
yield
any output
The latter restriction makes runPipe
less polymorphic than it could be,
and I settled on the restriction for three reasons:
 It prevents against accidental data loss.
 It protects against silent failures
 It prevents wastefully draining a scarce resource by gratuitously driving it to completion
If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:
runPipe $ forever await <+< p