Safe Haskell | Safe-Infered |
---|
- module Control.Pipe.Common
- module Control.Pipe.Monoidal
Tutorial
This library provides a single data type: Pipe
.
Pipe
is a monad transformer that extends the base monad with the ability
to await
input from or yield
output to other Pipe
s. Pipe
s resemble
enumeratees in other libraries because they receive an input stream and
transform it into a new stream.
Let's introduce our first Pipe
, which is a verbose version of the Prelude's
take
function:
take' :: Int -> Pipe a a IO () take' n = do replicateM_ n $ do x <- await yield x lift $ putStrLn "You shall not pass!"
This Pipe
allows the first n
values it receives to pass through
undisturbed, then it outputs a cute message and shuts down. Shutdown is
automatic when you reach the end of the monad. You don't need to send a
special signal to connected Pipe
s to let them know you are done handling
input or generating output.
Let's dissect the above Pipe'
s type to learn a bit about how Pipe
s work:
| Input Type | Output Type | Base monad | Return value Pipe a a IO ()
So take'
await
s input of type a
from upstream Pipe
s and yield
s
output of type a
to downstream Pipe
s. take'
uses IO
as its base
monad because it invokes the putStrLn
function. If we remove the call to
putStrLn
the compiler infers the following type instead, which is
polymorphic in the base monad:
take' :: (Monad m) => Int -> Pipe a a m ()
Pipe
s are conservative about using the base monad. In fact, you can only
invoke the base monad by using the lift
function. If you never use
lift
, your Pipe
will translate into pure code.
Now let's create a function that converts a list into a Pipe
by
yield
ing each element of the list:
fromList :: (Monad m) => [a] -> Pipe () a m () fromList = mapM_ yield
We use ()
as the input type of the Pipe
since it doesn't need any input
from an upstream Pipe
. You can think of fromList
as a one way Pipe
that can only deliver output, which makes it suitable for the first stage in
a Pipeline
. We provide a type synonym for this common case:
type Producer b m r = Pipe () b m r
You can then rewrite the type signature for fromList
as:
fromList :: (Monad m) => [a] -> Producer a m ()
The compiler would be ok with a polymorphic input type, since without any
calls to await
it doesn't need to constrain it. However, using ()
makes
it clear in the types that this Pipe
is designed to be used as a Producer
,
and statically prevents a number of mistakes when Pipe
s are combined.
Producer
s resemble enumerators in other libraries because they are a data
source. It is not illegal to use await
in a Producer
, it just returns
()
immediately without blocking.
Now let's create a Pipe
that prints every value delivered to it and never
terminates:
printer :: (Show a) => Pipe a Void IO b printer = forever $ do x <- await lift $ print x
The Void
in printer
's type signature indicates that it never delivers
output downstream, so it represents the final stage in a Pipeline
. Again,
we provide a type synonym for this common case:
type Consumer a m r = Pipe a Void m r
So we could instead write printer
's type as:
printer :: (Show a) => Consumer a IO b
Consumer
s resemble iteratees in other libraries because they are a data
sink. Consumer
s never use yield
statements.
What makes Pipe
s useful is the ability to compose them into Pipeline
s.
For that, we provide a >+>
operator (and its right-to-left counterpart
<+<
):
(>+>) :: Pipe a b m r -> Pipe b c m r -> Pipe a c m r (<+<) :: Pipe b c m r -> Pipe a b m r -> Pipe a c m r
For example, here is how you can compose the above Pipe
s:
pipeline :: Pipe () Void IO () pipeline = fromList [1..] >+> take' 3 >+> printer
This represents a self-contained Pipeline
and we provide a type synonym
for this common case:
type Pipeline m r = Pipe () Void m r
Like many other monad transformers, you convert the Pipe
monad back to the
base monad using some sort of "run...
" function. In this case, we
provide a runPipe
function:
runPipe :: Pipeline IO r -> IO r
runPipe
is actually more general, since it works with any
MonadBaseControl
, but we will work with the above simplified signature in
this tutorial.
There are also more general versione of runPipe
which work in
any monad, but don't have any exception-safety guarantees, so they should
only be used for Pipe
s that don't allocate any scarce resources.
runPurePipe :: (Monad m) => Pipeline m r -> m (Either SomeException r) runPurePipe_ :: (Monad m) => Pipeline m r -> m r
runPipe
, runPurePipe
and runPurePipe_
only work on self-contained
Pipeline
s. We explicitly require ()
as input type and Void
as output
type to ensure that the pipeline doesn't await
or yield
any value. If a
Pipe
is polymorphic in its input type (for example because it never uses
await
), then it can always be used as the first stage of a Pipeline
.
Similarly, a Pipe
that is polymorphic in its output type can be used as
the final stage.
It is generally good practice to use ()
(resp. Void
) explicitly as the
input (resp. output) type of a producer (resp. consumer), since it gives the
compiler more information on the intent of the Pipe
, and makes some common
errors detectable at compile time.
Let's try using runPipe
:
>>>
runPipe pipeline
1 2 3 You shall not pass!
Fascinating! Our Pipe
terminated even though printer
never terminates
and fromList
never terminates when given an infinite list. To illustrate
why our Pipe
terminated, let's outline the flow control rules for Pipe
composition.
- Execution begins at the most downstream
Pipe
(printer
in our example). - If a
Pipe
await
s input, it blocks and transfers control to the nextPipe
upstream until thatPipe
yield
s back a value. - If a
Pipe
yield
s output, it restores control to the original downstreamPipe
that wasawait
ing its input and binds its result to the return value of theawait
command. - If a
Pipe
terminates, it terminates every otherPipe
composed with it.
The last rule is crucial. If a Pipe
terminates then every downstream
Pipe
depending on its output cannot proceed, and upstream Pipe
s are
never evaluated because the terminated Pipe
will not request values from
them any longer.
So in our previous example, the Pipeline
terminated because take' 3
terminated and brought down the entire Pipeline
with it.
Pipe
s promote loose coupling, allowing you to mix and match them
transparently using composition. For example, we can define a new
Producer
pipe that indefinitely prompts the user for integers:
prompt :: Producer Int IO a prompt = forever $ do lift $ putStrLn "Enter a number: " n <- read <$> lift getLine yield n
Now we can compose it with any of our previous Pipe
s:
>>>
runPipe $ prompt >+> take' 3 >+> printer
Enter a number: 1<Enter> 1 Enter a number: 2<Enter> 2 Enter a number: 3<Enter> 3 You shall not pass!
You can easily "vertically" concatenate Pipe
s, Producer
s, and
Consumer
s, all using simple monad sequencing: (>>
). For example, here
is how you concatenate Producer
s:
>>>
runPipe $ (fromList [1..3] >> fromList [10..12]) >+> printer
1 2 3 10 11 12
Here's how you would concatenate Consumer
s:
print' :: (Show a) => Int -> Consumer a IO () print' n = take' n >+> printer
>>>
runPipe $ fromList [1..] >+> (print' 3 >> print' 4)
1 2 3 You shall not pass! 4 5 6 7 You shall not pass!
... but the above example is gratuitous because we could have just
concatenated the intermediate take'
Pipe
:
>>>
runPipe $ fromList [1..] >+> (take' 3 >> take' 4) >+> printer
1 2 3 You shall not pass! 4 5 6 7 You shall not pass!
Pipe composition imposes an important limitation: You can only compose
Pipe
s that have the same return type. For example, we could write the
following function:
deliver :: (Monad m) => Int -> Consumer a m [a] deliver n = replicateM n await
... and we might try to compose it with fromList
:
>>>
runPipe $ fromList [1..10] >+> deliver 3 -- wrong!
... but this wouldn't type-check, because fromList
has a return type of
()
and deliver
has a return type of [Int]
. All Pipe
s in a
composition need to have the same return type, since the return value of the
composed Pipe
is taken from the Pipe
that terminates first, and there's
no general way to determine which one it is statically.
Fortunately, we don't have to rewrite the fromList
function because we can
add a return value using vertical concatenation:
>>>
runPipe $ (fromList [1..10] >> return []) >+> deliver 3
[1,2,3]
... although a more idiomatic Haskell version would be:
>>>
runPipe $ (fromList [1..10] *> pure Nothing) >+> (Just <$> deliver 3)
Just [1,2,3]
which can be written using the $$
operator:
>>>
runPipe $ fromList [1..10] $$ deliver 3
Just [1,2,3]
This forces you to cover all code paths by thinking about what return value
you would provide if something were to go wrong. For example, let's say I
make a mistake and request more input than fromList
can deliver:
>>>
runPipe $ fromList [1..10] $$ deliver 99
Nothing
The type system saved me by forcing me to handle all possible ways my program could terminate.
Now what if you want to write a Pipe
that only reads from its input end
(i.e. a Consumer
) and returns a list of every value delivered to it when
its input Pipe
terminates? In Combinators
we find:
consume :: (Monad m) => Consumer a m [a]
but it turns out that it's not possible to write such a Pipe
using only
the primitive introduced so far, since we need a way to intercept upstream
termination and return the current accumulated list of input values before
terminating ourselves.
So we need to introduce a new primitive operation:
tryAwait :: (Monad m) => Pipe a b m (Maybe a)
tryAwait
works very similarly to await
, with two key differences:
- When upstream
yield
s some valuex
,tryAwait
returnsJust x
. - When upstream terminates,
tryAwait
returnsNothing
instead of terminating the currentPipe
immediately.
When tryAwait
returns Nothing
, the current Pipe
has a chance to
perform some final actions (typically yield
a final value or terminate
with a result) before being forcefully shut down. At that stage, further
invocations of tryAwait
will keep returning Nothing
, while using await
will terminate the pipe immediately.
Note that Pipe
termination only propagates through composition. To
illustrate this, let's use the following example:
p = do a <+< b c
a
, b
, and c
are Pipe
s, and c
shares the same input and output as
a <+< b
, otherwise we cannot combine them within the same monad. In the
above example, either a
or b
could terminate and bring down the other
one since they are composed, but c
is guaranteed to continue after
a <+< b
terminates because it is not composed with them. Conceptually,
we can think of this as c
automatically taking over the Pipe'
s
channeling responsibilities when a <+< b
can no longer continue. There
is no need to "restart" the input or output manually as in some other
iteratee libraries.
We now turn our attention to a very important feature of pipes: resource finalization.
Say we have the file "test.txt" with the following contents:
This is a test. Don't panic! Calm down, please!
.. and we wish to lazily read a line at a time from it:
handleReader' :: Handle -> Producer Text IO () handleReader' h = do eof <- lift $ hIsEOF h unless eof $ do s <- lift $ pack <$> hGetLine h yield s handleReader' h
Suppose, for the sake of example, that we know in advance how many lines we
need to read from the file. We can then use composition and the Monad
instance to try to build a resource-efficient version that only reads as
many lines as we request:
read' :: Int -> Producer Text IO () read' n = do lift $ putStrLn "Opening file ..." h <- lift $ openFile "test.txt" ReadMode take' n <+< handleReader' h lift $ putStrLn "Closing file ..." lift $ hClose h
Now compose!
>>>
runPipe $ read' 2 >+> printer
Opening file ... "This is a test." "Don't panic!" You shall not pass! Closing file ...
>>>
runPipe $ read' 99 >+> printer
Opening file ... "This is a test." "Don't panic!" "Calm down, please!" Closing file ...
In the first example, the pipeline terminates because take'
only requested
2 lines. In the second example, it terminates because readFile'
ran out
of input. However, in both cases the Pipe
never reads more lines than we
request and frees "test.txt" immediately when it is no longer needed.
Even more importantly, the file
is never opened if we replace printer
with a Pipe
that never demands input:
>>>
runPipe $ read' 2 >+> lift (putStrLn "I don't need input")
I don't need input
However, this read'
is not resource-safe in certain situations. For
example, take the following pipe:
>>>
runPipe $ read' 3 >+> take' 1 >+> printer
Opening file ... "This is a test." You shall not pass!
Oh no! Our Pipe
didn't properly close our file! take' 1
terminated
before read' 3
, preventing read' 3
from properly closing "test.txt".
Similarly, any exception thrown during execution of the Pipeline
can cause
the hClose
statement to be skipped, leaking an open handle.
We can force the read' 3
Pipe
to always close the file handle regardless
of exceptions or premature termination by using the bracket
function:
safeRead' :: Int -> Producer Text IO () safeRead' n = bracket (putStrLn "Opening file..." >> openFile "test.txt" ReadMode) (\h -> putStrLn "Closing file..." >> hClose h) (\h -> handleReader' h >+> take' n)
bracket
is similar to the homonymous function in Exception
: it
takes a function that creates some "resource", a function that disposes of
the created resource, and a function which takes the resource and returns a
Pipe
:
bracket :: Monad m => m r -- create resource -> (r -> m y) -- destroy resource -> (r -> Pipe a b m x) -- use resource in a 'Pipe' -> Pipe a b m x
Note that the "create" and "destroy" actions operate within the base
monad, so it's not possible to use yield
and await
there.
Using safeRead'
instead of read'
will now produce the desired behavior:
>>>
runPipe $ safeRead' 3 >+> take' 1 >+> printer
Opening file... "This is a test." You shall not pass! Closing file...
We also provide exception-handling primitives like catch
and
onException
. See Exception
for more details on exception
handling and a complete list of primitives.
Resource finalization and exception handling functionalities work in any
base monad, so we provide a Pipe
-specific mechanism for throwing
exceptions which does not suffer from the limitation of only being catchable
in the IO
monad:
throw :: (Monad m, Exception e) => e -> Pipe a b m r
However, exceptions thrown by other means (like error
or throw
in
Exception
), can only be caught when the Pipeline
is run with
runPipe
. If you use runPurePipe
, such an exception will abruptly
terminate the whole Pipeline
, and resource finalization will not be
guaranteed.
Implementation
module Control.Pipe.Common
module Control.Pipe.Monoidal