pipes-1.0: Compositional pipelines





data Pipe a b m r Source

The base type for pipes

The type of input received from upstream pipes
The type of output delivered to downstream pipes
The base monad
The type of the monad's final result

The Pipe type is partly inspired by Mario Blazevic's Coroutine in his concurrency article from Issue 19 of The Monad Reader and partly inspired by the Trace data type from A Language Based Approach to Unifying Events and Threads.


MonadTrans (Pipe a b) 
Monad m => Monad (Pipe a b m) 
Monad m => Functor (Pipe a b m) 
Monad m => Applicative (Pipe a b m) 

data Zero Source

A data type with no exposed constructors

type Producer b m r = Pipe Zero b m rSource

A pipe that can only produce values

type Consumer a m r = Pipe a Zero m rSource

A pipe that can only consume values

type Pipeline m r = Pipe Zero Zero m rSource

A self-contained pipeline that is ready to be run

Create Pipes

yield and await are the only two primitives you need to create Pipes. Because Pipe is a monad, you can assemble them using ordinary do notation. Since Pipe is also a monad transformer, you can use lift to invoke the base monad. For example:

 check :: Pipe a a IO r
 check = forever $ do
     x <- await
     lift $ putStrLn $ "Can " ++ (show x) ++ " pass?"
     ok <- lift $ read <$> getLine
     when ok (yield x)

await :: Pipe a b m aSource

Wait for input from upstream within the Pipe monad:

await blocks until input is ready.

yield :: b -> Pipe a b m ()Source

Pass output downstream within the Pipe monad:

yield blocks until the output has been received.

pipe :: Monad m => (a -> b) -> Pipe a b m rSource

Convert a pure function into a pipe

 pipe = forever $ do
     x <- await
     yield (f x)

discard :: Monad m => Pipe a b m rSource

The discard pipe silently discards all input fed to it.

Compose Pipes

There are two possible category implementations for Pipe:

Lazy composition
  • Use as little input as possible
  • Ideal for infinite input streams that never need finalization
Strict composition
  • Use as much input as possible
  • Ideal for finite input streams that need finalization

Both category implementations enforce the category laws:

  • Composition is associative (within each instance). This is not merely associativity of monadic effects, but rather true associativity. The result of composition produces identical composite Pipes regardless of how you group composition.
  • id is the identity Pipe. Composing a Pipe with id returns the original pipe.

Both categories prioritize downstream effects over upstream effects.

newtype Lazy m r a b Source




unLazy :: Pipe a b m r


Monad m => Category (Lazy m r) 

newtype Strict m r a b Source




unStrict :: Pipe a b m r


Monad m => Category (Strict m r) 

Compose Pipes

I provide convenience functions for composition that take care of newtype wrapping and unwrapping. For example:

 p1 <+< p2 = unLazy $ Lazy p1 <<< Lazy p2

<+< and <-< correspond to <<< from Control.Category

>+> and >+> correspond to >>> from Control.Category

<+< and >+> use Lazy composition (Mnemonic: + for optimistic evaluation)

<-< and >-> use Strict composition (Mnemonic: - for pessimistic evaluation)

However, the above operators won't work with id because they work on Pipes whereas id is a newtype on a Pipe. However, both Category instances share the same id implementation:

 instance Category (Lazy m r) where
     id = Lazy $ pipe id
 instance Category (Strict m r) where
     id = Strict $ pipe id

So if you need an identity Pipe that works with the above convenience operators, you can use idP which is just pipe id.

(<+<), (<-<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m rSource

(>+>), (>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m rSource

idP :: Monad m => Pipe a a m rSource

Run Pipes

runPipe :: Monad m => Pipeline m r -> m rSource

Run the Pipe monad transformer, converting it back into the base monad

runPipe will not work on a pipe that has loose input or output ends. If your pipe is still generating unhandled output, handle it. I choose not to automatically discard output for you, because that is only one of many ways to deal with unhandled output.