pipes-1.0: Compositional pipelines

Safe HaskellSafe



This library only provides a single data type: Pipe.

Pipe is a monad transformer that extends the base monad with the ability to await input from or yield output to other Pipes. Pipes resemble enumeratees in other libraries because they receive an input stream and transform it into a new stream.

I'll introduce our first Pipe, which is a verbose version of the Prelude's take function:

take' :: Int -> Pipe a a IO ()
take' n = do
    replicateM_ n $ do
        x <- await
        yield x
    lift $ putStrLn "You shall not pass!"

This Pipe allows the first n values it receives to pass through undisturbed, then it outputs a cute message and shuts down. Shutdown is automatic when you reach the end of the monad. You don't need to send a special signal to connected Pipes to let them know you are done handling input or generating output.

Let's dissect the above Pipe's type to learn a bit about how Pipes work:

     | Input Type | Output Type | Base monad | Return value
Pipe   a            a             IO           ()

So take' awaits input of type a from upstream Pipes and yields output of type a to downstream Pipes. take' uses IO as its base monad because it invokes the putStrLn function. If we remove the call to putStrLn the compiler infers the following type instead, which is polymorphic in the base monad:

take' :: (Monad m) => Int -> Pipe a a m ()

Pipes are conservative about using the base monad. In fact, you can only invoke the base monad by using the lift function from Pipe's MonadTrans instance. If you never use lift, your Pipe will translate into pure code.

Now let's create a function that converts a list into a Pipe by yielding each element of the list:

fromList :: (Monad m) => [a] -> Pipe Zero a m ()
fromList = mapM_ yield

The Zero in the type signature represents a type with no constructors and we use it to block the input end of the Pipe so that it can't request any input from an upstream Pipe. You can think of fromList as a one way Pipe that can only deliver output, which makes it suitable for the first stage in a Pipeline. I provide a type synonym for this common case:

type Producer b m r = Pipe Zero b m r

You can then rewrite the type signature for fromList as:

fromList :: (Monad m) => [a] -> Producer a m ()

Note that you don't have to block the input end with the Zero type. If you let the compiler infer the type, you would get:

fromList :: (Monad m) => [a] -> Pipe b a m ()

The compiler says that the input could be anything since without any calls to await it can't infer the input type. I only provide the Zero type as a convenience so that you can intentionally block Pipe ends.

Producers resemble enumerators in other libraries because they are a data source. Producers never use await statements.

Now let's create a Pipe that prints every value delivered to it and never terminates:

printer :: (Show a) => Pipe a Zero IO b
printer = forever $ do
    x <- await
    lift $ print x

The Zero in printer's type signature indicates that it never delivers output downstream, so it represents the final stage in a Pipeline. Again, I provide a type synonym for this common case:

type Consumer a m r = Pipe a Zero m r

So we could instead write printer's type as:

printer :: (Show a) => Consumer a IO b

Consumers resemble iteratees in other libraries because they are a data sink. Consumers never use yield statements.

What distinguishes Pipes from every other iteratee implementation is that they form a Category. Because of this, you can literally compose Pipes into Pipelines. Pipes actually possess two Category instances:

newtype Lazy   m r a b = Lazy   { unLazy   :: Pipe a b m r }
newtype Strict m r a b = Strict { unStrict :: Pipe a b m r }
instance Category (Lazy   m r) where ...
instance Category (Strict m r) where ...

The first category composes pipes with Lazy semantics and the second one composes Pipes with Strict semantics. I'll begin by demonstrating Lazy semantics.

For example, you can compose the above Pipes with:

pipeline :: Pipe Zero Zero IO ()
pipeline :: unLazy $ Lazy printer . Lazy (take 3) . Lazy (fromList [1..])

The compiler deduces that the final Pipe must be blocked at both ends, meaning it will never await any input and it will never yield any output. This represents a self-contained Pipeline and I provide a type synonym for this common case:

type Pipeline m r = Pipe Zero Zero m r

Also, I provide convenience operators for composing Pipes without the burden of wrapping and unwrapping newtypes. For example, to compose Pipes using Lazy semantics, just use the <+< operator:

p1 <+< p2 = unLazy $ Lazy p1 <<< Lazy p2 -- (<<<) is the same as (.)

So you can rewrite pipeline as:

pipeline :: Pipeline IO ()
pipeline = printer <+< take 3 <+< fromList [1..]

Like many other monad transformers, you convert the Pipe monad back to the base monad using some sort of "run..." function. In this case, it's the runPipe function:

runPipe :: (Monad m) => Pipeline m r -> m r

runPipe only works on self-contained Pipelines. This is the only function in the entire library that actually requires the Zero type because it must guarantee that its argument Pipe will never try to await or yield. You don't need to worry about explicitly giving it capped Pipes because self-contained Pipes will automatically have polymorphic input and output ends and runPipe will just assume those ends are Zero.

Let's try using runPipe:

>>> runPipe pipeline
You shall not pass!

Fascinating! Our Pipe terminated even though printer never terminates and fromList never terminates when given an infinite list. To illustrate why our Pipe terminated, let's outline the Pipe flow control rules for Lazy composition:

  • Execution begins at the most downstream Pipe (printer in our example).
  • If a Pipe awaits input, it blocks and transfers control to the next Pipe upstream until that Pipe yields back a value.
  • If a Pipe yields output, it restores control to the original downstream Pipe that was awaiting its input and binds its result to the return value of the await command.
  • If a Pipe terminates, it terminates every other Pipe composed with it.

The last rule follows from laziness. If a Pipe terminates then every downstream Pipe depending on its output cannot proceed, and upstream Pipes are never evaluated because the terminated Pipe will not request values from them any longer.

So in our previous example, the Pipeline terminated because take' 3 terminated and brought down the entire Pipeline with it.

Pipes promote loose coupling, allowing you to mix and match them transparently using composition. For example, we can define a new Producer pipe that indefinitely prompts the user for integers:

prompt :: Producer Int IO a
prompt = forever $ do
    lift $ putStrLn "Enter a number: "
    n <- read <$> lift getLine
    yield n

Now we can compose it with any of our previous Pipes:

>>> runPipe $ printer <+< take' 3 <+< prompt
Enter a number:
Enter a number:
Enter a number:
You shall not pass!

You can easily "vertically" concatenate Pipes, Producers, and Consumers, all using simple monad sequencing: (>>). For example, here is how you concatenate Producers:

>>> runPipe $ printer <+< (fromList [1..3] >> fromList [10..12])

Here's how you would concatenate Consumers:

>>> let print' n = printer <+< take' n :: (Show a) => Int -> Consumer a IO ()
>>> runPipe $ (print' 3 >> print' 4) <+< fromList [1..]
You shall not pass!
You shall not pass!

... but the above example is gratuitous because we could have just concatenated the intermediate take' Pipe:

>>> runPipe $ printer <+< (take' 3 >> take' 4) <+< fromList [1..]
You shall not pass!
You shall not pass!

Pipe composition imposes an important limitation: You can only compose Pipes that have the same return type. For example, I could write the following function:

deliver :: (Monad m) => Int -> Consumer a m [a]
deliver n = replicateM n await

... and I might try to compose it with fromList:

>>> runPipe $ deliver 3 <+< fromList [1..10] -- wrong!

... but this wouldn't type-check, because fromList has a return type of () and deliver has a return type of [Int]. Lazy composition requires that every Pipe has a return value ready in case it terminates first. This was not a conscious design choice, but rather a requirement of the Category instance.

Fortunately, we don't have to rewrite the fromList function because we can add a return value using vertical concatenation:

>>> runPipe $ deliver 3 <+< (fromList [1..10] >> return [])

... although a more idiomatic Haskell version would be:

>>> runPipe $ (Just <$> deliver 3) <+< (fromList [1..10] *> pure Nothing)
Just [1,2,3]

This forces you to cover all code paths by thinking about what return value you would provide if something were to go wrong. For example, let's say I make a mistake and request more input than fromList can deliver:

>>> runPipe $ (Just <$> deliver 99) <+< (fromList [1..10] *> pure Nothing)

The type system saved me by forcing me to handle all possible ways my program could terminate.

Now what if you want to write a Pipe that only reads from its input end (i.e. a Consumer) and returns a list of every value delivered to it when its input Pipe terminates?

toList :: (Monad m) => Consumer a m [a]
toList = ???

You can't write such a Pipe because if its input terminates then it brings down toList with it! This is a good thing because toList as defined is not compositional.

To see why, let's say you somehow got toList to work and the following imaginary code sample worked:

>>> runPipe $ toList <+< (fromList [1..5] >> return [])

toList is defined to return its value when the Pipe immediately upstream (fromList in this case) terminates. This behavior immediately leads to a problem. What if I were to insert an "identity" Pipe between toList and fromList:

identity = forever $ await >>= yield
-- This is how id in both categories is actually implemented

This Pipe forwards every valued untouched, so we would expect it to not have any affect if we were to insert it in the middle:

>>> runPipe $ toList <+< identity <+< (fromList [1..5] >> return [])
??? -- Oops! Something other than [1,2,3,4,5], perhaps even non-termination

The answer couldn't be [1,2,3,4,5] because toList would monitor identity instead of fromList and since identity never terminates toList never terminates. This is what I mean when I say that toList's specified behavior is non-compositional. It only works if it is coupled directly to the desired Pipe and breaks when you introduce intermediate stages.

This fortunate limitation was not an intentional design choice, but rather an inadvertent consequence of enforcing the Category laws when I was implementing Pipe's Category instance. Satisfying the Category laws forces code to be compositional.

Note that a terminated Pipe only brings down Pipes composed with it. To illustrate this, let's use the following example:

p = do a <+< b

a, b, and c are Pipes, and c shares the same input and output as a <+< b, otherwise we cannot combine them within the same monad. In the above example, either a or b could terminate and bring down the other one since they are composed, but c is guaranteed to continue after a <+< b terminates because it is not composed with them. Conceptually, we can think of this as c automatically taking over the Pipe's channeling responsibilities when a <+< b can no longer continue. There is no need to "restart" the input or output manually as in some other iteratee libraries.

The pipes library, unlike other iteratee libraries, grounds its vertical and horizontal concatenation in mathematics by deriving horizontal concatenation (.) from Category instance and vertical concatenation (>>) from its Monad instance. This makes it easier to reason about Pipes because you can leverage your intuition about Categorys and Monads to understand their behavior. The only Pipe-specific primitives are the await and yield functions.

Lazy composition has one important defect: resource finalization. Let's say we have the file "test.txt" with the following contents:

This is a test.
Don't panic!
Calm down, please!

.. and we wish to lazily read a line at a time from it:

readFile' :: Handle -> Producer Text IO ()
readFile' h = do
    eof <- lift $ hIsEOF h
    if eof
      then return ()
      else do
          s <- lift $ hGetLine h
          yield s
          readFile' h

We can use our Monad and Category instances to generate a resource-efficient version that only reads as many lines as we request:

read' n = do
        lift $ putStrLn "Opening file ..."
        h <- lift $ openFile "test.txt"
        take' n <+< readFile' h
        lift $ putStrLn "Closing file ..."
        lift $ hClose h

Now compose!

>>> runPipe $ printer <+< read' 2
Opening file ...
"This is a test."
"Don't panic!"
Closing file ...
>>> runPipe $ printer <+< read' 99
Opening file ...
"This is a test."
"Don't panic!"
"Calm down, please!"
Closing file ...

In the first example, take' n <+< readFile' h terminates because take' only requested 2 lines. In the second example, it terminates because readFile' ran out of input. However, in both cases the Pipe never reads more lines than we request frees "test.txt" immediately when it was no longer needed.

Even more importantly, the file is never opened if we replace printer with a Pipe that never demands input:

>>> runPipe $ (lift $ putStrLn "I don't need input") <+< read' 2
I don't need input

There is still one problem, though. What if we wrote:

>>> runPipe $ printer <+< take' 1 <+< read' 3
Opening file ...
"This is a test."

Oh no! Our Pipe didn't properly close our file! take' 1 terminated before read' 3, preventing read' 3 from properly closing "test.txt". We can force the read' 3 Pipe to close the file by using the discard function:

discard :: (Monad m) => Pipe a b m r
discard = forever await

If we append discard to take' 1, we will drive read' 3 to completion by continuing to pull values from it:

>>> runPipe $ printer <+< (take' 1 >> discard) <+< read' 3
Opening file ...
"This is a test."
Closing file ...

This allows read' 3 to complete so it can properly finalize itself. I include a convenience operator for this behavior:

p1 <-< p2 = (p1 >> discard) <+< p2

Interestingly, <-< forms a Category, too, namely the Strict category. This Category draws down all input by default (as the name suggests). I call it the Strict Category because discard resembles seq. discard drives its input to continue until one upstream Pipe terminates and this behavior resembles forcing its input to weak head normal form. If every Pipe drives its input to weak head normal form, you get Strict composition.

Strict composition works terribly on infinite inputs, as you would expect:

>>> runPipe $ printer <-< take' 3 <-< prompt
Enter a number:
Enter a number:
Enter a number:
You shall not pass!
Enter a number:
... <Prompts for input indefinitely and discards it>

Strict composition works best for inputs that are finite and require finalization. Lazy composition works best for inputs that are infinite (and obviously an infinite input never needs finalization).

However, unlike conventional strictness in Haskell, Strict Pipes do not load the entire input in memory. They still stream and immediately handle input just as Lazy Pipes. The only difference is that they guarantee input finalization (for better or for worse). Also, for Strict Pipelines the return value must come from the most upstream Pipe. Other than that, Strict composition will have the exact same sequence of monadic effects, resource usage, memory profile, and performance.

Like Haskell, you can mix Lazy and Strict composition. Keep in mind, though, that while <+< is associative with itself and <-< is associative with itself, mixtures of them are not associative. Alternatively, you could stick to Lazy composition and sprinkle discard statements wherever you desire strictness. It's up to you. However, when designing library functions, make them Lazy by default, since you can make Lazy code Strict by adding a discard statement, but you can't make Strict code Lazy.