This library provides a single data type:
Pipe is a monad transformer that extends the base monad with the ability
await input from or
yield output to other
enumeratees in other libraries because they receive an input stream and
transform it into a new stream.
take' :: Int -> Pipe a a IO () take' n = do replicateM_ n $ do x <- await yield x lift $ putStrLn "You shall not pass!"
Pipe allows the first
n values it receives to pass through
undisturbed, then it outputs a cute message and shuts down. Shutdown is
automatic when you reach the end of the monad. You don't need to send a
special signal to connected
Pipes to let them know you are done handling
input or generating output.
Let's dissect the above
Pipe's type to learn a bit about how
| Input Type | Output Type | Base monad | Return value Pipe a a IO ()
awaits input of type
a from upstream
output of type
a to downstream
IO as its base
monad because it invokes the
putStrLn function. If we remove the call to
putStrLn the compiler infers the following type instead, which is
polymorphic in the base monad:
take' :: (Monad m) => Int -> Pipe a a m ()
fromList :: (Monad m) => [a] -> Pipe () a m () fromList = mapM_ yield
() as the input type of the
Pipe since it doesn't need any input
from an upstream
Pipe. You can think of
fromList as a one way
that can only deliver output, which makes it suitable for the first stage in
Pipeline. We provide a type synonym for this common case:
type Producer b m r = Pipe () b m r
You can then rewrite the type signature for
fromList :: (Monad m) => [a] -> Producer a m ()
The compiler would be ok with a polymorphic input type, since without any
await it doesn't need to constrain it. However, using
it clear in the types that this
Pipe is designed to be used as a
and statically prevents a number of mistakes when
Pipes are combined.
Now let's create a
Pipe that prints every value delivered to it and never
printer :: (Show a) => Pipe a Void IO b printer = forever $ do x <- await lift $ print x
type Consumer a m r = Pipe a Void m r
So we could instead write
printer's type as:
printer :: (Show a) => Consumer a IO b
(>+>) :: Pipe a b m r -> Pipe b c m r -> Pipe a c m r (<+<) :: Pipe b c m r -> Pipe a b m r -> Pipe a c m r
For example, here is how you can compose the above
pipeline :: Pipe () Void IO () pipeline = fromList [1..] >+> take' 3 >+> printer
This represents a self-contained
Pipeline and we provide a type synonym
for this common case:
type Pipeline m r = Pipe () Void m r
runPipe :: Pipeline IO r -> IO r
runPipe is actually more general, since it works with any
MonadBaseControl, but we will work with the above simplified signature in
There are also more general versione of
runPipe which work in
any monad, but don't have any exception-safety guarantees, so they should
only be used for
Pipes that don't allocate any scarce resources.
runPipePipe :: (Monad m) => Pipeline m r -> m (Either SomeException r) runPurePipe_ :: (Monad m) => Pipeline m r -> m r
runPurePipe_ only work on self-contained
Pipelines. We explicitly require
() as input type and
Void as output
type to ensure that the pipeline doesn't
yield any value. If a
Pipe is polymorphic in its input type (for example because it never uses
await), then it can always be used as the first stage of a
Pipe that is polymorphic in its output type can be used as
the final stage.
It is generally good practice to use
Void) explicitly as the
input (resp. output) type of a producer (resp. consumer), since it gives the
compiler more information on the intent of the
Pipe, and makes some common
errors detectable at compile time.
Let's try using
runPipe pipeline1 2 3 You shall not pass!
Pipe terminated even though
printer never terminates
fromList never terminates when given an infinite list. To illustrate
Pipe terminated, let's outline the flow control rules for
- Execution begins at the most downstream
printerin our example).
- If a
awaits input, it blocks and transfers control to the next
Pipeupstream until that
yields back a value.
- If a
yields output, it restores control to the original downstream
awaiting its input and binds its result to the return value of the
- If a
Pipeterminates, it terminates every other
Pipecomposed with it.
The last rule is crucial. If a
Pipe terminates then every downstream
Pipe depending on its output cannot proceed, and upstream
never evaluated because the terminated
Pipe will not request values from
them any longer.
prompt :: Producer Int IO a prompt = forever $ do lift $ putStrLn "Enter a number: " n <- read <$> lift getLine yield n
Now we can compose it with any of our previous
runPipe $ prompt >+> take' 3 >+> printerEnter a number: 1<Enter> 1 Enter a number: 2<Enter> 2 Enter a number: 3<Enter> 3 You shall not pass!
runPipe $ (fromList [1..3] >> fromList [10..12]) >+> printer1 2 3 10 11 12
Here's how you would concatenate
print' :: (Show a) => Int -> Consumer a IO () print' n = take' n >+> printer
runPipe $ fromList [1..] >+> (print' 3 >> print' 4)1 2 3 You shall not pass! 4 5 6 7 You shall not pass!
... but the above example is gratuitous because we could have just
concatenated the intermediate
runPipe $ fromList [1..] >+> (take' 3 >> take' 4) >+> printer1 2 3 You shall not pass! 4 5 6 7 You shall not pass!
Pipe composition imposes an important limitation: You can only compose
Pipes that have the same return type. For example, we could write the
deliver :: (Monad m) => Int -> Consumer a m [a] deliver n = replicateM n await
... and we might try to compose it with
runPipe $ fromList [1..10] >+> deliver 3 -- wrong!
... but this wouldn't type-check, because
fromList has a return type of
deliver has a return type of
Pipes in a
composition need to have the same return type, since the return value of the
Pipe is taken from the
Pipe that terminates first, and there's
no general way to determine which one it is statically.
Fortunately, we don't have to rewrite the
fromList function because we can
add a return value using vertical concatenation:
runPipe $ (fromList [1..10] >> return ) >+> deliver 3[1,2,3]
... although a more idiomatic Haskell version would be:
runPipe $ (fromList [1..10] *> pure Nothing) >+> (Just <$> deliver 3)Just [1,2,3]
which can be written using the
runPipe $ fromList [1..10] $$ deliver 3Just [1,2,3]
This forces you to cover all code paths by thinking about what return value
you would provide if something were to go wrong. For example, let's say I
make a mistake and request more input than
fromList can deliver:
runPipe $ fromList [1..10] $$ deliver 99Nothing
The type system saved me by forcing me to handle all possible ways my program could terminate.
consume :: (Monad m) => Consumer a m [a]
but it turns out that it's not possible to write such a
Pipe using only
the primitive introduced so far, since we need a way to intercept upstream
termination and return the current accumulated list of input values before
So we need to introduce a new primitive operation:
tryAwait :: (Monad m) => Pipe a b m (Maybe a)
tryAwait works very similarly to
await, with two key differences:
- When upstream
yields some value
- When upstream terminates,
Nothinginstead of terminating the current
Nothing, the current
Pipe has a chance to
perform some final actions (typically
yield a final value or terminate
with a result) before being forcefully shut down. At that stage, further
tryAwait will keep returning
Nothing, while using
will terminate the pipe immediately.
Pipe termination only propagates through composition. To
illustrate this, let's use the following example:
p = do a <+< b c
c shares the same input and output as
a <+< b, otherwise we cannot combine them within the same monad. In the
above example, either
b could terminate and bring down the other
one since they are composed, but
c is guaranteed to continue after
a <+< b terminates because it is not composed with them. Conceptually,
we can think of this as
c automatically taking over the
channeling responsibilities when
a <+< b can no longer continue. There
is no need to "restart" the input or output manually as in some other
We now turn our attention to a very important feature of pipes: resource finalization.
Say we have the file "test.txt" with the following contents:
This is a test. Don't panic! Calm down, please!
.. and we wish to lazily read a line at a time from it:
handleReader' :: Handle -> Producer Text IO () handleReader' h = do eof <- lift $ hIsEOF h unless eof $ do s <- lift $ pack <$> hGetLine h yield s handleReader' h
Suppose, for the sake of example, that we know in advance how many lines we
need to read from the file. We can then use composition and the
instance to try to build a resource-efficient version that only reads as
many lines as we request:
read' :: Int -> Producer Text IO () read' n = do lift $ putStrLn "Opening file ..." h <- lift $ openFile "test.txt" ReadMode take' n <+< handleReader' h lift $ putStrLn "Closing file ..." lift $ hClose h
runPipe $ read' 2 >+> printerOpening file ... "This is a test." "Don't panic!" You shall not pass! Closing file ...
runPipe $ read' 99 >+> printerOpening file ... "This is a test." "Don't panic!" "Calm down, please!" Closing file ...
In the first example, the pipeline terminates because
take' only requested
2 lines. In the second example, it terminates because
readFile' ran out
of input. However, in both cases the
Pipe never reads more lines than we
request and frees "test.txt" immediately when it is no longer needed.
Even more importantly, the
file is never opened if we replace
Pipe that never demands input:
runPipe $ read' 2 >+> lift (putStrLn "I don't need input")I don't need input
read' is not resource-safe in certain situations. For
example, take the following pipe:
runPipe $ read' 3 >+> take' 1 >+> printerOpening file ... "This is a test." You shall not pass!
Oh no! Our
Pipe didn't properly close our file!
take' 1 terminated
read' 3, preventing
read' 3 from properly closing "test.txt".
Similarly, any exception thrown during execution of the
Pipeline can cause
hClose statement to be skipped, leaking an open handle.
We can force the
Pipe to always close the file handle regardless
of exceptions or premature termination by using the
safeRead' :: Int -> Producer Text IO () safeRead' n = bracket (putStrLn "Opening file..." >> openFile "test.txt" ReadMode) (\h -> putStrLn "Closing file..." >> hClose h) (\h -> handleReader' h >+> take' n)
bracket is similar to the homonymous function in
takes a function that creates some "resource", a function that disposes of
the created resource, and a function which takes the resource and returns a
bracket :: Monad m => m r -- create resource -> (r -> m y) -- destroy resource -> (r -> Pipe a b m x) -- use resource in a 'Pipe' -> Pipe a b m x
safeRead' instead of
read' will now produce the desired behavior:
runPipe $ safeRead' 3 >+> take' 1 >+> printerOpening file... "This is a test." You shall not pass! Closing file...
Resource finalization and exception handling functionalities work in any
base monad, so we provide a
Pipe-specific mechanism for throwing
exceptions which does not suffer from the limitation of only being catchable
throw :: (Monad m, Exception e) => e -> Pipe a b m r
However, exceptions thrown by other means (like
Exception), can only be caught when the
Pipeline is run with
runPipe. If you use
runPurePipe, such an exception will abruptly
terminate the whole
Pipeline, and resource finalization will not be