| Safe Haskell | Safe-Infered |
|---|
Control.Pipe
Contents
- module Control.Pipe.Common
- module Control.Pipe.Monoidal
Tutorial
This library provides a single data type: Pipe.
Pipe is a monad transformer that extends the base monad with the ability
to await input from or yield output to other Pipes. Pipes resemble
enumeratees in other libraries because they receive an input stream and
transform it into a new stream.
Let's introduce our first Pipe, which is a verbose version of the Prelude's
take function:
take' :: Int -> Pipe a a IO ()
take' n = do
replicateM_ n $ do
x <- await
yield x
lift $ putStrLn "You shall not pass!"
This Pipe allows the first n values it receives to pass through
undisturbed, then it outputs a cute message and shuts down. Shutdown is
automatic when you reach the end of the monad. You don't need to send a
special signal to connected Pipes to let them know you are done handling
input or generating output.
Let's dissect the above Pipe's type to learn a bit about how Pipes work:
| Input Type | Output Type | Base monad | Return value Pipe a a IO ()
So take' awaits input of type a from upstream Pipes and yields
output of type a to downstream Pipes. take' uses IO as its base
monad because it invokes the putStrLn function. If we remove the call to
putStrLn the compiler infers the following type instead, which is
polymorphic in the base monad:
take' :: (Monad m) => Int -> Pipe a a m ()
Pipes are conservative about using the base monad. In fact, you can only
invoke the base monad by using the lift function. If you never use
lift, your Pipe will translate into pure code.
Now let's create a function that converts a list into a Pipe by
yielding each element of the list:
fromList :: (Monad m) => [a] -> Pipe () a m () fromList = mapM_ yield
We use () as the input type of the Pipe since it doesn't need any input
from an upstream Pipe. You can think of fromList as a one way Pipe
that can only deliver output, which makes it suitable for the first stage in
a Pipeline. We provide a type synonym for this common case:
type Producer b m r = Pipe () b m r
You can then rewrite the type signature for fromList as:
fromList :: (Monad m) => [a] -> Producer a m ()
The compiler would be ok with a polymorphic input type, since without any
calls to await it doesn't need to constrain it. However, using () makes
it clear in the types that this Pipe is designed to be used as a Producer,
and statically prevents a number of mistakes when Pipes are combined.
Producers resemble enumerators in other libraries because they are a data
source. It is not illegal to use await in a Producer, it just returns
() immediately without blocking.
Now let's create a Pipe that prints every value delivered to it and never
terminates:
printer :: (Show a) => Pipe a Void IO b
printer = forever $ do
x <- await
lift $ print x
The Void in printer's type signature indicates that it never delivers
output downstream, so it represents the final stage in a Pipeline. Again,
we provide a type synonym for this common case:
type Consumer a m r = Pipe a Void m r
So we could instead write printer's type as:
printer :: (Show a) => Consumer a IO b
Consumers resemble iteratees in other libraries because they are a data
sink. Consumers never use yield statements.
What makes Pipes useful is the ability to compose them into Pipelines.
For that, we provide a >+> operator (and its right-to-left counterpart
<+<):
(>+>) :: Pipe a b m r -> Pipe b c m r -> Pipe a c m r (<+<) :: Pipe b c m r -> Pipe a b m r -> Pipe a c m r
For example, here is how you can compose the above Pipes:
pipeline :: Pipe () Void IO () pipeline = fromList [1..] >+> take' 3 >+> printer
This represents a self-contained Pipeline and we provide a type synonym
for this common case:
type Pipeline m r = Pipe () Void m r
Like many other monad transformers, you convert the Pipe monad back to the
base monad using some sort of "run..." function. In this case, we
provide a runPipe function:
runPipe :: Pipeline IO r -> IO r
runPipe is actually more general, since it works with any
MonadBaseControl, but we will work with the above simplified signature in
this tutorial.
There are also more general versione of runPipe which work in
any monad, but don't have any exception-safety guarantees, so they should
only be used for Pipes that don't allocate any scarce resources.
runPipePipe :: (Monad m) => Pipeline m r -> m (Either SomeException r) runPurePipe_ :: (Monad m) => Pipeline m r -> m r
runPipe, runPurePipe and runPurePipe_ only work on self-contained
Pipelines. We explicitly require () as input type and Void as output
type to ensure that the pipeline doesn't await or yield any value. If a
Pipe is polymorphic in its input type (for example because it never uses
await), then it can always be used as the first stage of a Pipeline.
Similarly, a Pipe that is polymorphic in its output type can be used as
the final stage.
It is generally good practice to use () (resp. Void) explicitly as the
input (resp. output) type of a producer (resp. consumer), since it gives the
compiler more information on the intent of the Pipe, and makes some common
errors detectable at compile time.
Let's try using runPipe:
>>>runPipe pipeline1 2 3 You shall not pass!
Fascinating! Our Pipe terminated even though printer never terminates
and fromList never terminates when given an infinite list. To illustrate
why our Pipe terminated, let's outline the flow control rules for Pipe
composition.
- Execution begins at the most downstream
Pipe(printerin our example). - If a
Pipeawaits input, it blocks and transfers control to the nextPipeupstream until thatPipeyields back a value. - If a
Pipeyields output, it restores control to the original downstreamPipethat wasawaiting its input and binds its result to the return value of theawaitcommand. - If a
Pipeterminates, it terminates every otherPipecomposed with it.
The last rule is crucial. If a Pipe terminates then every downstream
Pipe depending on its output cannot proceed, and upstream Pipes are
never evaluated because the terminated Pipe will not request values from
them any longer.
So in our previous example, the Pipeline terminated because take' 3
terminated and brought down the entire Pipeline with it.
Pipes promote loose coupling, allowing you to mix and match them
transparently using composition. For example, we can define a new
Producer pipe that indefinitely prompts the user for integers:
prompt :: Producer Int IO a
prompt = forever $ do
lift $ putStrLn "Enter a number: "
n <- read <$> lift getLine
yield n
Now we can compose it with any of our previous Pipes:
>>>runPipe $ prompt >+> take' 3 >+> printerEnter a number: 1<Enter> 1 Enter a number: 2<Enter> 2 Enter a number: 3<Enter> 3 You shall not pass!
You can easily "vertically" concatenate Pipes, Producers, and
Consumers, all using simple monad sequencing: (>>). For example, here
is how you concatenate Producers:
>>>runPipe $ (fromList [1..3] >> fromList [10..12]) >+> printer1 2 3 10 11 12
Here's how you would concatenate Consumers:
print' :: (Show a) => Int -> Consumer a IO () print' n = take' n >+> printer
>>>runPipe $ fromList [1..] >+> (print' 3 >> print' 4)1 2 3 You shall not pass! 4 5 6 7 You shall not pass!
... but the above example is gratuitous because we could have just
concatenated the intermediate take' Pipe:
>>>runPipe $ fromList [1..] >+> (take' 3 >> take' 4) >+> printer1 2 3 You shall not pass! 4 5 6 7 You shall not pass!
Pipe composition imposes an important limitation: You can only compose
Pipes that have the same return type. For example, we could write the
following function:
deliver :: (Monad m) => Int -> Consumer a m [a] deliver n = replicateM n await
... and we might try to compose it with fromList:
>>>runPipe $ fromList [1..10] >+> deliver 3 -- wrong!
... but this wouldn't type-check, because fromList has a return type of
() and deliver has a return type of [Int]. All Pipes in a
composition need to have the same return type, since the return value of the
composed Pipe is taken from the Pipe that terminates first, and there's
no general way to determine which one it is statically.
Fortunately, we don't have to rewrite the fromList function because we can
add a return value using vertical concatenation:
>>>runPipe $ (fromList [1..10] >> return []) >+> deliver 3[1,2,3]
... although a more idiomatic Haskell version would be:
>>>runPipe $ (fromList [1..10] *> pure Nothing) >+> (Just <$> deliver 3)Just [1,2,3]
which can be written using the $$ operator:
>>>runPipe $ fromList [1..10] $$ deliver 3Just [1,2,3]
This forces you to cover all code paths by thinking about what return value
you would provide if something were to go wrong. For example, let's say I
make a mistake and request more input than fromList can deliver:
>>>runPipe $ fromList [1..10] $$ deliver 99Nothing
The type system saved me by forcing me to handle all possible ways my program could terminate.
Now what if you want to write a Pipe that only reads from its input end
(i.e. a Consumer) and returns a list of every value delivered to it when
its input Pipe terminates? In Combinators we find:
consume :: (Monad m) => Consumer a m [a]
but it turns out that it's not possible to write such a Pipe using only
the primitive introduced so far, since we need a way to intercept upstream
termination and return the current accumulated list of input values before
terminating ourselves.
So we need to introduce a new primitive operation:
tryAwait :: (Monad m) => Pipe a b m (Maybe a)
tryAwait works very similarly to await, with two key differences:
- When upstream
yields some valuex,tryAwaitreturnsJust x. - When upstream terminates,
tryAwaitreturnsNothinginstead of terminating the currentPipeimmediately.
When tryAwait returns Nothing, the current Pipe has a chance to
perform some final actions (typically yield a final value or terminate
with a result) before being forcefully shut down. At that stage, further
invocations of tryAwait will keep returning Nothing, while using await
will terminate the pipe immediately.
Note that Pipe termination only propagates through composition. To
illustrate this, let's use the following example:
p = do a <+< b
c
a, b, and c are Pipes, and c shares the same input and output as
a <+< b, otherwise we cannot combine them within the same monad. In the
above example, either a or b could terminate and bring down the other
one since they are composed, but c is guaranteed to continue after
a <+< b terminates because it is not composed with them. Conceptually,
we can think of this as c automatically taking over the Pipe's
channeling responsibilities when a <+< b can no longer continue. There
is no need to "restart" the input or output manually as in some other
iteratee libraries.
We now turn our attention to a very important feature of pipes: resource finalization.
Say we have the file "test.txt" with the following contents:
This is a test. Don't panic! Calm down, please!
.. and we wish to lazily read a line at a time from it:
handleReader' :: Handle -> Producer Text IO ()
handleReader' h = do
eof <- lift $ hIsEOF h
unless eof $ do
s <- lift $ pack <$> hGetLine h
yield s
handleReader' h
Suppose, for the sake of example, that we know in advance how many lines we
need to read from the file. We can then use composition and the Monad
instance to try to build a resource-efficient version that only reads as
many lines as we request:
read' :: Int -> Producer Text IO ()
read' n = do
lift $ putStrLn "Opening file ..."
h <- lift $ openFile "test.txt" ReadMode
take' n <+< handleReader' h
lift $ putStrLn "Closing file ..."
lift $ hClose h
Now compose!
>>>runPipe $ read' 2 >+> printerOpening file ... "This is a test." "Don't panic!" You shall not pass! Closing file ...
>>>runPipe $ read' 99 >+> printerOpening file ... "This is a test." "Don't panic!" "Calm down, please!" Closing file ...
In the first example, the pipeline terminates because take' only requested
2 lines. In the second example, it terminates because readFile' ran out
of input. However, in both cases the Pipe never reads more lines than we
request and frees "test.txt" immediately when it is no longer needed.
Even more importantly, the file is never opened if we replace printer
with a Pipe that never demands input:
>>>runPipe $ read' 2 >+> lift (putStrLn "I don't need input")I don't need input
However, this read' is not resource-safe in certain situations. For
example, take the following pipe:
>>>runPipe $ read' 3 >+> take' 1 >+> printerOpening file ... "This is a test." You shall not pass!
Oh no! Our Pipe didn't properly close our file! take' 1 terminated
before read' 3, preventing read' 3 from properly closing "test.txt".
Similarly, any exception thrown during execution of the Pipeline can cause
the hClose statement to be skipped, leaking an open handle.
We can force the read' 3 Pipe to always close the file handle regardless
of exceptions or premature termination by using the bracket function:
safeRead' :: Int -> Producer Text IO () safeRead' n = bracket (putStrLn "Opening file..." >> openFile "test.txt" ReadMode) (\h -> putStrLn "Closing file..." >> hClose h) (\h -> handleReader' h >+> take' n)
bracket is similar to the homonymous function in Exception: it
takes a function that creates some "resource", a function that disposes of
the created resource, and a function which takes the resource and returns a
Pipe:
bracket :: Monad m
=> m r -- create resource
-> (r -> m y) -- destroy resource
-> (r -> Pipe a b m x) -- use resource in a 'Pipe'
-> Pipe a b m x
Note that the "create" and "destroy" actions operate within the base
monad, so it's not possible to use yield and await there.
Using safeRead' instead of read' will now produce the desired behavior:
>>>runPipe $ safeRead' 3 >+> take' 1 >+> printerOpening file... "This is a test." You shall not pass! Closing file...
We also provide exception-handling primitives like catch and
onException. See Exception for more details on exception
handling and a complete list of primitives.
Resource finalization and exception handling functionalities work in any
base monad, so we provide a Pipe-specific mechanism for throwing
exceptions which does not suffer from the limitation of only being catchable
in the IO monad:
throw :: (Monad m, Exception e) => e -> Pipe a b m r
However, exceptions thrown by other means (like error or throw in
Exception), can only be caught when the Pipeline is run with
runPipe. If you use runPurePipe, such an exception will abruptly
terminate the whole Pipeline, and resource finalization will not be
guaranteed.
Implementation
module Control.Pipe.Common
module Control.Pipe.Monoidal