pipes-2.4.0: Compositional pipelines

Safe HaskellNone




The Pipe type is a monad transformer that enriches the base monad with the ability to await or yield data to and from other Pipes.



The Pipe type is strongly inspired by Mario Blazevic's Coroutine type in his concurrency article from Issue 19 of The Monad Reader and is formulated in the exact same way.

His Coroutine type is actually a free monad transformer (i.e. FreeT) and his InOrOut functor corresponds to PipeF.

data PipeF a b x Source

The base functor for the Pipe type


Await (a -> x) 
Yield b x 


Functor (PipeF a b) 

type Pipe a b = FreeT (PipeF a b)Source

The base type for pipes

  • a - The type of input received from upstream pipes
  • b - The type of output delivered to downstream pipes
  • m - The base monad
  • r - The type of the return value

data C Source

The empty type, denoting a 'C'losed end

type Producer b = Pipe () bSource

A pipe that produces values

type Consumer b = Pipe b CSource

A pipe that consumes values

type Pipeline = Pipe () CSource

A self-contained pipeline that is ready to be run

Create Pipes

yield and await are the only two primitives you need to create pipes. Since Pipe a b m is a monad, you can assemble yield and await statements using ordinary do notation. Since Pipe a b is also a monad transformer, you can use lift to invoke the base monad. For example, you could write a pipe stage that requests permission before forwarding any output:

 check :: (Show a) => Pipe a a IO r
 check = forever $ do
     x <- await
     lift $ putStrLn $ "Can '" ++ (show x) ++ "' pass?"
     ok <- read <$> lift getLine
     when ok (yield x)

await :: Monad m => Pipe a b m aSource

Wait for input from upstream.

await blocks until input is available from upstream.

yield :: Monad m => b -> Pipe a b m ()Source

Deliver output downstream.

yield restores control back upstream and binds the result to await.

pipe :: Monad m => (a -> b) -> Pipe a b m rSource

Convert a pure function into a pipe

 pipe f = forever $ do
     x <- await
     yield (f x)

Compose Pipes

Pipes form a Category, meaning that you can compose Pipes using (<+<) and also define an identity Pipe: idP. These satisfy the category laws:

 idP <+< p = p

 p <+< idP = p

 (p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)

Pipe composition binds the output of the upstream Pipe to the input of the downstream Pipe. Like Haskell functions, Pipes are lazy, meaning that upstream Pipes are only evaluated as far as necessary to generate enough input for downstream Pipes. If any Pipe terminates, it also terminates every Pipe composed with it.

(<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m rSource

Corresponds to (<<<)/(.) from Control.Category

(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m rSource

Corresponds to (>>>) from Control.Category

idP :: Monad m => Pipe a a m rSource

Corresponds to id from Control.Category

newtype PipeC m r a b Source

Pipes form a Category instance when you rearrange the type variables




unPipeC :: Pipe a b m r


Monad m => Category (PipeC m r) 

Run Pipes

Note that you can also unwrap a Pipe a single step at a time using runFreeT (since Pipe is just a type synonym for a free monad transformer). This will take you to the next external await or yield statement. This means that a closed Pipeline will unwrap to a single step, in which case you would have been better served by runPipe.

runPipe :: Monad m => Pipeline m r -> m rSource

Run the Pipe monad transformer, converting it back into the base monad.

runPipe imposes two conditions:

  • The pipe's input, if any, is trivially satisfiable (i.e. ())
  • The pipe does not yield any output

The latter restriction makes runPipe less polymorphic than it could be, and I settled on the restriction for three reasons:

  • It prevents against accidental data loss.
  • It protects against silent failures
  • It prevents wastefully draining a scarce resource by gratuitously driving it to completion

If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:

 runPipe $ forever await <+< p