pipes-core-0.1.0: Compositional pipelines

Safe HaskellSafe-Infered





This library provides a single data type: Pipe.

Pipe is a monad transformer that extends the base monad with the ability to await input from or yield output to other Pipes. Pipes resemble enumeratees in other libraries because they receive an input stream and transform it into a new stream.

Let's introduce our first Pipe, which is a verbose version of the Prelude's take function:

 take' :: Int -> Pipe a a IO ()
 take' n = do
     replicateM_ n $ do
         x <- await
         yield x
     lift $ putStrLn "You shall not pass!"

This Pipe allows the first n values it receives to pass through undisturbed, then it outputs a cute message and shuts down. Shutdown is automatic when you reach the end of the monad. You don't need to send a special signal to connected Pipes to let them know you are done handling input or generating output.

Let's dissect the above Pipe's type to learn a bit about how Pipes work:

      | Input Type | Output Type | Base monad | Return value
 Pipe   a            a             IO           ()

So take' awaits input of type a from upstream Pipes and yields output of type a to downstream Pipes. take' uses IO as its base monad because it invokes the putStrLn function. If we remove the call to putStrLn the compiler infers the following type instead, which is polymorphic in the base monad:

 take' :: (Monad m) => Int -> Pipe a a m ()

Pipes are conservative about using the base monad. In fact, you can only invoke the base monad by using the lift function. If you never use lift, your Pipe will translate into pure code.

Now let's create a function that converts a list into a Pipe by yielding each element of the list:

 fromList :: (Monad m) => [a] -> Pipe () a m ()
 fromList = mapM_ yield

We use () as the input type of the Pipe since it doesn't need any input from an upstream Pipe. You can think of fromList as a one way Pipe that can only deliver output, which makes it suitable for the first stage in a Pipeline. We provide a type synonym for this common case:

 type Producer b m r = Pipe () b m r

You can then rewrite the type signature for fromList as:

 fromList :: (Monad m) => [a] -> Producer a m ()

The compiler would be ok with a polymorphic input type, since without any calls to await it doesn't need to constrain it. However, using () makes it clear in the types that this Pipe is designed to be used as a Producer, and statically prevents a number of mistakes when Pipes are combined.

Producers resemble enumerators in other libraries because they are a data source. It is not illegal to use await in a Producer, it just returns () immediately without blocking.

Now let's create a Pipe that prints every value delivered to it and never terminates:

 printer :: (Show a) => Pipe a Void IO b
 printer = forever $ do
     x <- await
     lift $ print x

The Void in printer's type signature indicates that it never delivers output downstream, so it represents the final stage in a Pipeline. Again, we provide a type synonym for this common case:

 type Consumer a m r = Pipe a Void m r

So we could instead write printer's type as:

 printer :: (Show a) => Consumer a IO b

Consumers resemble iteratees in other libraries because they are a data sink. Consumers never use yield statements.

What makes Pipes useful is the ability to compose them into Pipelines. For that, we provide a >+> operator (and its right-to-left counterpart <+<):

 (>+>) :: Pipe a b m r -> Pipe b c m r -> Pipe a c m r
 (<+<) :: Pipe b c m r -> Pipe a b m r -> Pipe a c m r

For example, here is how you can compose the above Pipes:

 pipeline :: Pipe () Void IO ()
 pipeline = fromList [1..] >+> take' 3 >+> printer

This represents a self-contained Pipeline and we provide a type synonym for this common case:

 type Pipeline m r = Pipe () Void m r

Like many other monad transformers, you convert the Pipe monad back to the base monad using some sort of "run..." function. In this case, we provide a runPipe function:

 runPipe :: Pipeline IO r -> IO r

runPipe is actually more general, since it works with any MonadBaseControl, but we will work with the above simplified signature in this tutorial.

There are also more general versione of runPipe which work in any monad, but don't have any exception-safety guarantees, so they should only be used for Pipes that don't allocate any scarce resources.

 runPurePipe :: (Monad m) => Pipeline m r -> m (Either SomeException r)
 runPurePipe_ :: (Monad m) => Pipeline m r -> m r

runPipe, runPurePipe and runPurePipe_ only work on self-contained Pipelines. We explicitly require () as input type and Void as output type to ensure that the pipeline doesn't await or yield any value. If a Pipe is polymorphic in its input type (for example because it never uses await), then it can always be used as the first stage of a Pipeline. Similarly, a Pipe that is polymorphic in its output type can be used as the final stage.

It is generally good practice to use () (resp. Void) explicitly as the input (resp. output) type of a producer (resp. consumer), since it gives the compiler more information on the intent of the Pipe, and makes some common errors detectable at compile time.

Let's try using runPipe:

>>> runPipe pipeline
You shall not pass!

Fascinating! Our Pipe terminated even though printer never terminates and fromList never terminates when given an infinite list. To illustrate why our Pipe terminated, let's outline the flow control rules for Pipe composition.

  • Execution begins at the most downstream Pipe (printer in our example).
  • If a Pipe awaits input, it blocks and transfers control to the next Pipe upstream until that Pipe yields back a value.
  • If a Pipe yields output, it restores control to the original downstream Pipe that was awaiting its input and binds its result to the return value of the await command.
  • If a Pipe terminates, it terminates every other Pipe composed with it.

The last rule is crucial. If a Pipe terminates then every downstream Pipe depending on its output cannot proceed, and upstream Pipes are never evaluated because the terminated Pipe will not request values from them any longer.

So in our previous example, the Pipeline terminated because take' 3 terminated and brought down the entire Pipeline with it.

Pipes promote loose coupling, allowing you to mix and match them transparently using composition. For example, we can define a new Producer pipe that indefinitely prompts the user for integers:

 prompt :: Producer Int IO a
 prompt = forever $ do
     lift $ putStrLn "Enter a number: "
     n <- read <$> lift getLine
     yield n

Now we can compose it with any of our previous Pipes:

>>> runPipe $ prompt >+> take' 3 >+> printer
Enter a number:
Enter a number:
Enter a number:
You shall not pass!

You can easily "vertically" concatenate Pipes, Producers, and Consumers, all using simple monad sequencing: (>>). For example, here is how you concatenate Producers:

>>> runPipe $ (fromList [1..3] >> fromList [10..12]) >+> printer

Here's how you would concatenate Consumers:

 print' :: (Show a) => Int -> Consumer a IO ()
 print' n = take' n >+> printer
>>> runPipe $ fromList [1..] >+> (print' 3 >> print' 4)
You shall not pass!
You shall not pass!

... but the above example is gratuitous because we could have just concatenated the intermediate take' Pipe:

>>> runPipe $ fromList [1..] >+> (take' 3 >> take' 4) >+> printer
You shall not pass!
You shall not pass!

Pipe composition imposes an important limitation: You can only compose Pipes that have the same return type. For example, we could write the following function:

 deliver :: (Monad m) => Int -> Consumer a m [a]
 deliver n = replicateM n await

... and we might try to compose it with fromList:

>>> runPipe $ fromList [1..10] >+> deliver 3 -- wrong!

... but this wouldn't type-check, because fromList has a return type of () and deliver has a return type of [Int]. All Pipes in a composition need to have the same return type, since the return value of the composed Pipe is taken from the Pipe that terminates first, and there's no general way to determine which one it is statically.

Fortunately, we don't have to rewrite the fromList function because we can add a return value using vertical concatenation:

>>> runPipe $ (fromList [1..10] >> return []) >+> deliver 3

... although a more idiomatic Haskell version would be:

>>> runPipe $ (fromList [1..10] *> pure Nothing) >+> (Just <$> deliver 3)
Just [1,2,3]

which can be written using the $$ operator:

>>> runPipe $ fromList [1..10] $$ deliver 3
Just [1,2,3]

This forces you to cover all code paths by thinking about what return value you would provide if something were to go wrong. For example, let's say I make a mistake and request more input than fromList can deliver:

>>> runPipe $ fromList [1..10] $$ deliver 99

The type system saved me by forcing me to handle all possible ways my program could terminate.

Now what if you want to write a Pipe that only reads from its input end (i.e. a Consumer) and returns a list of every value delivered to it when its input Pipe terminates? In Combinators we find:

 consume :: (Monad m) => Consumer a m [a]

but it turns out that it's not possible to write such a Pipe using only the primitive introduced so far, since we need a way to intercept upstream termination and return the current accumulated list of input values before terminating ourselves.

So we need to introduce a new primitive operation:

 tryAwait :: (Monad m) => Pipe a b m (Maybe a)

tryAwait works very similarly to await, with two key differences:

  1. When upstream yields some value x, tryAwait returns Just x.
  2. When upstream terminates, tryAwait returns Nothing instead of terminating the current Pipe immediately.

When tryAwait returns Nothing, the current Pipe has a chance to perform some final actions (typically yield a final value or terminate with a result) before being forcefully shut down. At that stage, further invocations of tryAwait will keep returning Nothing, while using await will terminate the pipe immediately.

Note that Pipe termination only propagates through composition. To illustrate this, let's use the following example:

 p = do a <+< b

a, b, and c are Pipes, and c shares the same input and output as a <+< b, otherwise we cannot combine them within the same monad. In the above example, either a or b could terminate and bring down the other one since they are composed, but c is guaranteed to continue after a <+< b terminates because it is not composed with them. Conceptually, we can think of this as c automatically taking over the Pipe's channeling responsibilities when a <+< b can no longer continue. There is no need to "restart" the input or output manually as in some other iteratee libraries.

We now turn our attention to a very important feature of pipes: resource finalization.

Say we have the file "test.txt" with the following contents:

 This is a test.
 Don't panic!
 Calm down, please!

.. and we wish to lazily read a line at a time from it:

 handleReader' :: Handle -> Producer Text IO ()
 handleReader' h = do
     eof <- lift $ hIsEOF h
     unless eof $ do
         s <- lift $ pack <$> hGetLine h
         yield s
         handleReader' h

Suppose, for the sake of example, that we know in advance how many lines we need to read from the file. We can then use composition and the Monad instance to try to build a resource-efficient version that only reads as many lines as we request:

 read' :: Int -> Producer Text IO ()
 read' n = do
     lift $ putStrLn "Opening file ..."
     h <- lift $ openFile "test.txt" ReadMode
     take' n <+< handleReader' h
     lift $ putStrLn "Closing file ..."
     lift $ hClose h

Now compose!

>>> runPipe $ read' 2 >+> printer
Opening file ...
"This is a test."
"Don't panic!"
You shall not pass!
Closing file ...
>>> runPipe $ read' 99 >+> printer
Opening file ...
"This is a test."
"Don't panic!"
"Calm down, please!"
Closing file ...

In the first example, the pipeline terminates because take' only requested 2 lines. In the second example, it terminates because readFile' ran out of input. However, in both cases the Pipe never reads more lines than we request and frees "test.txt" immediately when it is no longer needed.

Even more importantly, the file is never opened if we replace printer with a Pipe that never demands input:

>>> runPipe $ read' 2 >+> lift (putStrLn "I don't need input")
I don't need input

However, this read' is not resource-safe in certain situations. For example, take the following pipe:

>>> runPipe $ read' 3 >+> take' 1 >+> printer
Opening file ...
"This is a test."
You shall not pass!

Oh no! Our Pipe didn't properly close our file! take' 1 terminated before read' 3, preventing read' 3 from properly closing "test.txt".

Similarly, any exception thrown during execution of the Pipeline can cause the hClose statement to be skipped, leaking an open handle.

We can force the read' 3 Pipe to always close the file handle regardless of exceptions or premature termination by using the bracket function:

 safeRead' :: Int -> Producer Text IO ()
 safeRead' n = bracket
   (putStrLn "Opening file..." >> openFile "test.txt" ReadMode)
   (\h -> putStrLn "Closing file..." >> hClose h)
   (\h -> handleReader' h >+> take' n)

bracket is similar to the homonymous function in Exception: it takes a function that creates some "resource", a function that disposes of the created resource, and a function which takes the resource and returns a Pipe:

 bracket :: Monad m
         => m r                   -- create resource
         -> (r -> m y)            -- destroy resource
         -> (r -> Pipe a b m x)   -- use resource in a 'Pipe'
         -> Pipe a b m x

Note that the "create" and "destroy" actions operate within the base monad, so it's not possible to use yield and await there.

Using safeRead' instead of read' will now produce the desired behavior:

>>> runPipe $ safeRead' 3 >+> take' 1 >+> printer
Opening file...
"This is a test."
You shall not pass!
Closing file...

We also provide exception-handling primitives like catch and onException. See Exception for more details on exception handling and a complete list of primitives.

Resource finalization and exception handling functionalities work in any base monad, so we provide a Pipe-specific mechanism for throwing exceptions which does not suffer from the limitation of only being catchable in the IO monad:

 throw :: (Monad m, Exception e) => e -> Pipe a b m r

However, exceptions thrown by other means (like error or throw in Exception), can only be caught when the Pipeline is run with runPipe. If you use runPurePipe, such an exception will abruptly terminate the whole Pipeline, and resource finalization will not be guaranteed.