pipes-2.5.0: Compositional pipelines

Safe HaskellSafe




The Pipe type is a monad transformer that enriches the base monad with the ability to await or yield data to and from other Pipes.



The Pipe type is strongly inspired by Mario Blazevic's Coroutine type in his concurrency article from Issue 19 of The Monad Reader.

data Pipe a b m r Source #

The base type for pipes

  • a - The type of input received from upstream pipes
  • b - The type of output delivered to downstream pipes
  • m - The base monad
  • r - The type of the return value


Await (a -> Pipe a b m r) 
Yield b (Pipe a b m r) 
M (m (Pipe a b m r)) 
Pure r 


MonadTrans (Pipe a b) Source # 


lift :: Monad m => m a -> Pipe a b m a #

Monad m => Monad (Pipe a b m) Source # 


(>>=) :: Pipe a b m a -> (a -> Pipe a b m b) -> Pipe a b m b #

(>>) :: Pipe a b m a -> Pipe a b m b -> Pipe a b m b #

return :: a -> Pipe a b m a #

fail :: String -> Pipe a b m a #

Monad m => Functor (Pipe a b m) Source # 


fmap :: (a -> b) -> Pipe a b m a -> Pipe a b m b #

(<$) :: a -> Pipe a b m b -> Pipe a b m a #

Monad m => Applicative (Pipe a b m) Source # 


pure :: a -> Pipe a b m a #

(<*>) :: Pipe a b m (a -> b) -> Pipe a b m a -> Pipe a b m b #

(*>) :: Pipe a b m a -> Pipe a b m b -> Pipe a b m b #

(<*) :: Pipe a b m a -> Pipe a b m b -> Pipe a b m a #

data C Source #

The empty type, denoting a 'C'losed end

type Producer b = Pipe () b Source #

A pipe that produces values

type Consumer b = Pipe b C Source #

A pipe that consumes values

type Pipeline = Pipe () C Source #

A self-contained pipeline that is ready to be run

Create Pipes

yield and await are the only two primitives you need to create pipes. Since Pipe a b m is a monad, you can assemble yield and await statements using ordinary do notation. Since Pipe a b is also a monad transformer, you can use lift to invoke the base monad. For example, you could write a pipe stage that requests permission before forwarding any output:

check :: (Show a) => Pipe a a IO r
check = forever $ do
    x <- await
    lift $ putStrLn $ "Can '" ++ (show x) ++ "' pass?"
    ok <- read <$> lift getLine
    when ok (yield x)

await :: Pipe a b m a Source #

Wait for input from upstream.

await blocks until input is available from upstream.

yield :: b -> Pipe a b m () Source #

Deliver output downstream.

yield restores control back upstream and binds the result to await.

pipe :: Monad m => (a -> b) -> Pipe a b m r Source #

Convert a pure function into a pipe

pipe f = forever $ do
    x <- await
    yield (f x)

Compose Pipes

Pipes form a Category, meaning that you can compose Pipes using (<+<) and also define an identity Pipe: idP. These satisfy the category laws:

idP <+< p = p

p <+< idP = p

(p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)

Pipe composition binds the output of the upstream Pipe to the input of the downstream Pipe. Like Haskell functions, Pipes are lazy, meaning that upstream Pipes are only evaluated as far as necessary to generate enough input for downstream Pipes. If any Pipe terminates, it also terminates every Pipe composed with it.

(<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r infixr 9 Source #

Corresponds to (<<<)/(.) from Control.Category

(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r infixl 9 Source #

Corresponds to (>>>) from Control.Category

idP :: Monad m => Pipe a a m r Source #

Corresponds to id from Control.Category

newtype PipeC m r a b Source #

Pipes form a Category instance when you rearrange the type variables





Monad m => Category * (PipeC m r) Source # 


id :: cat a a #

(.) :: cat b c -> cat a b -> cat a c #

Run Pipes

Note that you can also unwrap a Pipe a single step at a time using runFreeT (since Pipe is just a type synonym for a free monad transformer). This will take you to the next external await or yield statement. This means that a closed Pipeline will unwrap to a single step, in which case you would have been better served by runPipe.

runPipe :: Monad m => Pipeline m r -> m r Source #

Run the Pipe monad transformer, converting it back into the base monad.

runPipe imposes two conditions:

  • The pipe's input, if any, is trivially satisfiable (i.e. ())
  • The pipe does not yield any output

The latter restriction makes runPipe less polymorphic than it could be, and I settled on the restriction for three reasons:

  • It prevents against accidental data loss.
  • It protects against silent failures
  • It prevents wastefully draining a scarce resource by gratuitously driving it to completion

If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:

runPipe $ forever await <+< p