Safe Haskell | Safe-Inferred |
---|
- Introduction
- Bidirectionality
- Type Synonyms
- Request and Respond
- Composition
- The Proxy Class
- Interleaving Effects
- Mixing Base Monads
- Utilities
- Mix Monads and Composition
- ListT
- Folds
- Resource Management
- Extensions
- Error handling
- Local state
- Branching, zips, and merges
- Proxy Transformers
- Proxy Morphism Laws
- Functors on Proxies
- Conclusion
- Appendix
This module provides a brief introductory tutorial in the "Introduction" section followed by a lengthy discussion of the library's design and idioms.
I've condensed all the code examples into a self-contained code block in the Appendix section that you can use to follow along.
Introduction
The pipes
library replaces lazy IO
with a safe, elegant, and
theoretically principled alternative. Use this library if you:
- want to write high-performance streaming programs,
- believe that lazy
IO
was a bad idea, - enjoy composing modular and reusable components, or
- love theory and elegant code.
This library unifies many kinds of streaming abstractions, all of which are
special cases of "proxies" (The pipes
name is a legacy of one such
abstraction).
Let's begin with the simplest Proxy
: a Producer
. The following
Producer
lazily streams lines from a Handle
import Control.Monad import Control.Proxy import System.IO -- Produces Strings ---+----------+ -- | | -- v v lines' :: (Proxy p) => Handle -> () -> Producer p String IO () lines' h () = runIdentityP loop where loop = do eof <- lift $ hIsEOF h if eof then return () else do str <- lift $ hGetLine h respond str -- Produce the string loop -- Ignore the 'runIdentityP' and '()' for now
But why limit ourselves to streaming lines from some file? Why not lazily generate values from an industrious user?
-- Uses 'IO' as the base monad --+ -- | -- v promptInt :: (Proxy p) => () -> Producer p Int IO r promptInt () = runIdentityP $ forever $ do lift $ putStrLn "Enter an Integer:" n <- lift readLn -- 'lift' invokes an action in the base monad respond n
Now we need to hook our Producer
s up to a Consumer
. The following
Consumer
endlessly request
s a stream of Show
able values and print
s
them:
-- Consumes 'a's ---+----------+ +-- Never terminates, so -- | | | the return value is -- v v v polymorphic printer :: (Proxy p, Show a) => () -> Consumer p a IO r printer () = runIdentityP $ forever $ do a <- request () -- Consume a value lift $ putStrLn "Received a value:" lift $ print a
You can compose a Producer
and a Consumer
using (>->
), which produces
a runnable Session
:
-- Self-contained session ---+ +--+-- These must match -- | | | each component -- v v v lines' h >-> printer :: (Proxy p) => () -> Session p IO () promptInt >-> printer :: (Proxy p) => () -> Session p IO r
(>->
) connects each request
in printer
with a respond
in
lines'
or promptInt
.
Finally, you use runProxy
to run the Session
and convert it back to the
base monad. First we'll try our lines'
Producer
, which will stream
lines from the following file:
$ cat test.txt Line 1 Line 2 Line 3
The following program never brings more than a single line into memory (not that it matters for such a small file):
>>>
withFile "test.txt" ReadMode $ \h -> runProxy $ lines' h >-> printer
Received a value: "Line 1" Received a value: "Line 2" Received a value: "Line 3"
Similarly, we can lazily stream user input, requesting values from the user only when we need them:
>>>
runProxy $ promptInt >-> printer :: IO r
Enter an Integer: 1<Enter> Received a value: 1 Enter an Integer: 5<Enter> Received a value: 5 ...
The last example proceeds endlessly until we hit Ctrl-C
to interrupt it.
We would like to limit the number of iterations, so lets define an
intermediate Proxy
that behaves like a verbose take
. I will call it a
Pipe
(this library's namesake) since values flow through it:
-- 'a's flow in ---+ +--- 'a's flow out -- | | -- v v take' :: (Proxy p) => Int -> () -> Pipe p a a IO () take' n () = runIdentityP $ do replicateM_ n $ do a <- request () respond a lift $ putStrLn "You shall not pass!"
This Pipe
forwards the first n
values it receives undisturbed, then it
outputs a cute message. You can compose it between the Producer
and
Consumer
using (>->
):
>>>
runProxy $ promptInt >-> take' 2 >-> printer :: IO ()
Enter an Integer: 9<Enter> Received a value: 9 Enter an Integer: 2<Enter> Received a value: 2 You shall not pass!
When take' 2
terminates, it brings down every Proxy
composed with it.
Notice how promptInt
behaves lazily and only respond
s with as many
values as we request
. We request
ed exactly two values, so it only
prompts the user twice.
We can already spot several improvements upon traditional lazy IO
:
- You can define your own lazy components that have nothing to do with files
-
pipes
never usesunsafePerformIO
and never violates referential transparency. - You don't need strictness hacks to ensure the proper ordering of effects
- You can interleave effects in downstream stages, too
However, this library can offer even more than that!
Bidirectionality
So far we've only defined proxies that send information downstream in the
direction of the (>->
) arrow. However, we don't need to limit ourselves
to unidirectional communication and we can enhance these proxies with the
ability to send information upstream with each request
that determines
how upstream stages respond
.
For example, Client
s generalize Consumer
s because they can supply an
argument other than ()
with each request
. The following Client
sends three request
s upstream, each of which provides an Int
argument
and expects a Bool
result
:
-- Sends out 'Int's ---+ +-- Receives back 'Bool's -- | | -- v v threeReqs :: (Proxy p) => () -> Client p Int Bool IO () threeReqs () = runIdentityP $ forM_ [1, 3, 1] $ \argument -> do lift $ putStrLn $ "Client Sends: " ++ show (argument :: Int) result <- request argument lift $ putStrLn $ "Client Receives: " ++ show (result :: Bool) lift $ putStrLn "*"
Notice how Client
s use "request argument
" instead of
"request ()
". This sends "argument
" upstream to parametrize the
request
.
Server
s similarly generalize Producer
s because they receive arguments
other than ()
. The following Server
receives Int
requests and
responds with Bool
s:
-- Receives 'Int's ---+ +--- Replies with 'Bool's -- | | -- v v comparer :: (Proxy p) => Int -> Server p Int Bool IO r comparer = runIdentityK loop where loop argument = do lift $ putStrLn $ "Server Receives: " ++ show (argument :: Int) let result = argument > 2 lift $ putStrLn $ "Server Sends: " ++ show (result :: Bool) nextArgument <- respond result loop nextArgument
Notice how Server
s receive their first argument as a parameter and bind
each subsequent argument using respond
. This library provides a
combinator which abstracts away this common pattern:
foreverK :: (Monad m) => (a -> m a) -> a -> m b foreverK f = loop where loop argument = do nextArgument <- f argument loop nextArgument -- or: foreverK f = f >=> foreverK f -- = f >=> f >=> f >=> f >=> ...
We can use this to simplify the comparer
Server
:
comparer = runIdentityK $ foreverK $ \argument -> do lift $ putStrLn $ "Server Receives: " ++ show argument let result = argument > 2 lift $ putStrLn $ "Server Sends: " ++ show result respond result
... which looks just like the way you might write a server's main loop in another programming language.
You can compose a Server
and Client
using (>->
), and this also returns
a runnable Session
:
comparer >-> threeReqs :: (Proxy p) => () -> Session p IO ()
Running this executes the client-server session:
>>>
runProxy $ comparer >-> threeReqs :: IO ()
Client Sends: 1 Server Receives: 1 Server Sends: False Client Receives: False * Client Sends: 3 Server Receives: 3 Server Sends: True Client Receives: True * Client Sends: 1 Server Receives: 1 Server Sends: False Client Receives: False *
Proxy
s generalize Pipe
s because they allow information to flow upstream.
The following Proxy
caches request
s to reduce the load on the Server
if the request matches a previous one:
import qualified Data.Map as M -- 'p' is the Proxy, as the (Proxy p) constraint indicates cache :: (Proxy p, Ord key) => key -> p key val key val IO r cache = runIdentityK (loop M.empty) where loop _map key = case M.lookup key _map of Nothing -> do val <- request key key2 <- respond val loop (M.insert key val _map) key2 Just val -> do lift $ putStrLn "Used cache!" key2 <- respond val loop _map key2
You can compose the cache
Proxy
between the Server
and Client
using
(>->
):
>>>
runProxy $ comparer >-> cache >-> threeReqs
Client Sends: 1 Server Receives: 1 Server Sends: False Client Receives: False * Client Sends: 3 Server Receives: 3 Server Sends: True Client Receives: True * Client Sends: 1 Used cache! Client Receives: False *
This bidirectional flow of information separates pipes
from other
streaming libraries which are unable to model Client
s, Server
s, or
Proxy
s. Using pipes
you can define interfaces to RPC interfaces, REST
architectures, message buses, chat clients, web servers, network protocols
... you name it!
Type Synonyms
You might wonder why (>->
) accepts Producer
s, Consumer
s, Pipe
s,
Client
s, Server
s, and Proxy
s. It turns out that these type-check
because they are all type synonyms that expand to the following central
type:
(Proxy p) => p a' a b' b m r
Like the name suggests, a Proxy
exposes two interfaces: an upstream
interface and a downstream interface. Each interface can both send and
receive values:
Upstream | Downstream +---------+ | | a' <== <== b' | Proxy | a ==> ==> b | | +---------+
Proxies are monad transformers that enrich the base monad with the ability to send or receive values upstream or downstream:
| Sends | Receives | Receives | Sends | Base | Return | Upstream | Upstream | Downstream | Downstream | Monad | Value p a' a b' b m r
We can selectively close certain inputs or outputs to generate specialized proxies.
For example, a Producer
is a Proxy
that can only output values to its
downstream interface:
Upstream | Downstream +----------+ | | C <== <== () | Producer | () ==> ==> b | | +----------+ type Producer p b m r = p C () () b m r -- The 'C' type is uninhabited, so it 'C'loses an output end
A Consumer
is a Proxy
that can only receive values on its upstream
interface:
Upstream | Downstream +----------+ | | () <== <== () | Consumer | a ==> ==> C | | +----------+ type Consumer p a m r = p () a () C m r
A Pipe
is a Proxy
that can only receive values on its upstream interface
and send values on its downstream interface:
Upstream | Downstream +--------+ | | () <== <== () | Pipe | a ==> ==> b | | +--------+ type Pipe p a b m r = p () a () b m r
When we compose proxies, the type system ensures that their input and output types match:
promptInt >-> take' 2 >-> printer +-----------+ +---------+ +---------+ | | | | | | C <== <== () <== <== () <== <== () | | | | | | | promptInt | | take' 2 | | printer | | | | | | | () ==> ==> Int ==> ==> Int ==> ==> C | | | | | | +-----------+ +---------+ +---------+
Composition fuses these into a new Proxy
that has both ends closed, which
is a Session
:
+-----------------------------------+ | | C <== <== () | | | promptInt >-> take' 2 >-> printer | | | () ==> ==> C | | +-----------------------------------+ type Session p m r = p C () () C m r
A Client
is a Proxy
that only uses its upstream interface:
Upstream | Downstream +----------+ | | a' <== <== () | Client | a ==> ==> C | | +----------+ type Client p a' a m r = p a' a () C m r
A Server
is a Proxy
that only uses its downstream interface:
Upstream | Downstream +----------+ | | C <== <== b' | Server | () ==> ==> b | | +----------+ type Server p b' b m r = p C () b' b m r
The compiler ensures that the types match when we compose Server
s,
Proxy
s, and Client
s.
comparer >-> cache >-> threeReqs +----------+ +-------+ +-----------+ | | | | | | C <== <== Int <== <== Int <== <== () | | | | | | | comparer | | cache | | threeReqs | | | | | | | () ==> ==> Bool ==> ==> Bool ==> ==> C | | | | | | +----------+ +-------+ +-----------+
This similarly fuses into a Session
:
+----------------------------------+ | | C <== <== () | | | comparer >-> cache >-> threeReqs | | | () ==> ==> C | | +----------------------------------+
pipes
encourages substantial code reuse by implementing all abstractions
as type synonyms on top of a single type class: Proxy
. This makes your
life easier because:
- You only use one composition operator: (
>->
) - You can mix multiple abstractions together as long as the types match
Request and Respond
There are only two ways to interact with other proxies: request
and
respond
. Let's examine their type signatures to understand how they
work:
request :: (Monad m, Proxy p) => a' -> p a' a b' b m a ^ ^ | | Argument --+ Result --+
request
sends an argument of type a'
upstream, and binds a result of
type a
. Whenever you request
, you block until upstream respond
s with
a value.
respond :: (Monad m, Proxy p) => b -> p a' a b' b m b' ^ ^ | | Result --+ Next Argument --+
respond
replies with a result of type b
, and then binds the next
argument of type b'
. Whenever you respond
, you block until downstream
request
s a new value.
Wait, if respond
always binds the next argument, where does the first
argument come from? Well, it turns out that every Proxy
receives this
initial argument as an ordinary parameter, as if they all began blocked on
a respond
statement.
We can see this if we take all the previous proxies we defined and fully
expand every type synonym. The initial argument of each Proxy
matches
the type parameter corresponding to the return value of respond
:
These +-- Columns ---+ | Match | v v promptInt :: (Proxy p) => () -> p C () () Int IO r printer :: (Proxy p, Show a) => () -> p () a () C IO r take' :: (Proxy p) => Int -> () -> p () a () a IO () comparer :: (Proxy p) => Int -> p C () Int Bool IO r cache :: (Proxy p, Ord key) => key -> p key val key val IO r
You can also study the type of composition, which follows this same pattern.
Composition requires two Proxy
s blocked on a respond
, and produces a new
Proxy
similarly blocked on a respond
:
(>->) :: (Monad m, Proxy p) => (b' -> p a' a b' b m r) -> (c' -> p b' b c' c m r) -> (c' -> p a' a c' c m r) ^ ^ | These | +---Match----+
This is why Producer
s, Consumer
s, Pipe
s and Client
s all take ()
as their initial argument, because their corresponding respond
commands
all have a return value of ()
.
This library also provides (>~>
), which is the dual of the (>->
)
composition operator. (>~>
) composes two Proxy
s blocked on a request
and returns a new Proxy
blocked on a request
:
(>~>) :: (Monad m, Proxy p) => (a -> p a' a b' b m r) -> (b -> p b' b c' c m r) -> (a -> p a' a c' c m r)
Conceptually, (>->
) composes pull-based systems and (>~>
) composes
push-based systems.
In fact, if you went back through the previous code and systematically replaced every:
... then everything would still work and produce identical behavior, except the compiler would now infer the symmetric types with all interfaces reversed. We can therefore conclude the obvious: pull-based systems are symmetric to push-based systems.
Since these two composition operators are perfectly symmetric, I arbitrarily
standardize on using (>->
) and I provide all standard library proxies
blocked on respond
so that they work with (>->
). This gives behavior
more familiar to Haskell programmers that work with lazy pull-based
functions. I only include the (>~>
) composition operator for theoretical
completeness.
Composition
When we compose (p1 >-> p2)
, composition ensures that p1
's downstream
interface matches p2
's upstream interface. This follows from the type of
(>->
):
(>->) :: (Monad m, Proxy p) => (b' -> p a' a b' b m r) -> (c' -> p b' b c' c m r) -> (c' -> p a' a c' c m r)
Diagramatically, this looks like:
p1 >-> p2 +--------+ +--------+ | | | | a' <== <== b' <== <== c' | p1 | | p2 | a ==> ==> b ==> ==> c | | | | +--------+ +--------+
p1
's downstream (b', b)
interface matches p2
's upstream (b', b)
interface, so composition connects them on this shared interface. This
fuses away the (b', b)
interface, leaving behind p1
's upstream (a', a)
interface and p2
's downstream (c', c)
interface:
+-----------------+ | | a' <== <== c' | p1 >-> p2 | a ==> ==> c | | +-----------------+
Proxy composition has the very nice property that it is associative, meaning that it behaves the exact same way no matter how you group composition:
(p1 >-> p2) >-> p3 = p1 >-> (p2 >-> p3)
... so you can safely elide the parentheses:
p1 >-> p2 >-> p3
Also, we can define a 'T
'ransparent Proxy
that auto-forwards values
both ways:
idT :: (Monad m, Proxy p) => a' -> p a' a a' a m r idT = runIdentityK loop where loop a' = do a <- request a' a'2 <- respond a loop a'2 -- or: idT = runIdentityK $ foreverK $ request >=> respond -- = runIdentityK $ request >=> respond >=> request >=> respond ...
Diagramatically, this looks like:
+-----+ | | a' <======== a' <- All values pass | idT | straight through a ========> a <- immediately | | +-----+
Transparency means that:
idT >-> p = p p >-> idT = p
In other words, idT
is an identity of composition.
This means that proxies form a true Category
where (>->
) is composition
and idT
is the identity. The associativity law and the two
identity laws are just the Category
laws. The objects of the category are
the Proxy
interfaces.
These Category
laws guarantee the following important properties:
The Proxy Class
All the proxy code we wrote was generic over the Proxy
type class, which
defines the four central operations of this library's API:
- (
->>
): "Pointful" pull-based composition, from which the point-free (>->
) operator derives - (
>>~
): "Pointful" push-based composition, from which the point-free (>~>
) operator derives -
request
: Request input from upstream -
respond
: Respond with output to downstream
pipes
defines everything in terms of these four operations, which is
why all the library's utilities are polymorphic over the Proxy
type class.
Let's look at some example instances of the Proxy
type class:
instance Proxy ProxyFast -- Fastest implementation instance Proxy ProxyCorrect -- Strict monad transformer laws
These two types provide the two alternative base implementations:
-
ProxyFast
: This runs significantly faster on pure code segments and employs several rewrite rules to optimize your code into the equivalent hand-tuned code. -
ProxyCorrect
: This uses a monad transformer implementation that is correct by construction, but runs about 8x slower on pure code segments. However, forIO
-bound code, the performance gap is small.
These two implementations differ only in the runProxy
function that they
export, which is how the compiler selects which Proxy
implementation to
use.
Control.Proxy automatically selects the fast implementation for you, but you can always choose the correct implementation instead by replacing Control.Proxy with the following two imports:
import Control.Proxy.Core -- Everything except the base implementation import Control.Proxy.Core.Correct -- The alternative base implementation
These are not the only instances of the Proxy
type class! This library
also provides several "proxy transformers", which are like monad
transformers except that they also correctly lift the Proxy
type class:
instance (Proxy p) => Proxy (IdentityP p) instance (Proxy p) => Proxy (EitherP e p) instance (Proxy p) => Proxy (MaybeP p) instance (Proxy p) => Proxy (ReaderP i p) instance (Proxy p) => Proxy (StateP s p) instance (Proxy p) => Proxy (WriterP w p)
All of the Proxy
code we wrote so far also works seamlessly with all of
these proxy transformers. The Proxy
class abstracts over the
implementation details and extensions so that you can reuse the same library
code for any feature set.
This polymorphism comes at a price: you must embed your Proxy
code in at
least one proxy transformer if you want clean type class constraints. If
you don't use extensions then you embed your code in the identity proxy
transformer: IdentityP
. This is why all the examples use runIdentityP
or runIdentityK
to embed their code in IdentityP
. Control.Proxy.Class
provides a longer discussion on this subject.
Without this IdentityP
embedding, the compiler infers uglier constraints,
which are also significantly less polymorphic. We can show this by
removing the runIdentityP
call from promptInt
and see what type the
compiler infers:
promptInt () = forever $ do lift $ putStrLn "Enter an Integer:" n <- lift readLn respond n
>>>
:t promptInt -- I've cleaned up the inferred type
promptInt :: (Monad (Producer p Int IO), MonadTrans (Producer p Int), Proxy p) => () -> Producer p Int IO r
All Proxy
instances are already monads and monad transformers, but the
compiler cannot infer that without the IdentityP
embedding. When we embed
promptInt
in IdentityP
, the compiler collapses the Monad
and
MonadTrans
constraints into the Proxy
constraint.
Fortunately, you do not pay any performance price for this IdentityP
embedding or the type class polymorphism. Your polymorphic code will still
run very rapidly, as fast as if you had specialized it to a concrete
Proxy
instance without the IdentityP
embedding. I've taken great care
to ensure that all optimizations and rewrite rules always see through these
abstractions without any assistance on your part.
Interleaving Effects
When you compose two proxies, you interleave their effects in the base monad. The following two proxies demonstrate this interleaving of effects:
downstream :: (Proxy p) => () -> Consumer p () IO () downstream () = runIdentityP $ do lift $ print 1 request () -- Switch to upstream lift $ print 3 request () -- Switch to upstream upstream :: (Proxy p) => () -> Producer p () IO () upstream () = runIdentityP $ do lift $ print 2 respond () -- Switch to downstream lift $ print 4
Control.Proxy.Class enumerates the Proxy
laws, which equationally
define how all Proxy
instances must behave. These laws require that
(upstream >-> downstream)
must reduce to the following:
upstream >-> downstream -- This is true no matter what feature = -- set or 'Proxy' instance you select \() -> lift $ do print 1 print 2 print 3 print 4
Conceptually, runProxy
just applies this to ()
and removes the lift
:
runProxy $ upstream >-> downstream = do print 1 print 2 print 3 print 4
Let's test this:
>>>
runProxy $ upstream >-> downstream
1 2 3 4
The Proxy
laws let you reason about how proxies interleave effects without
knowing any specifics about the underlying implementation. Intuitively, the
Proxy
laws say that:
-
request
blocks until upstreamrespond
s -
respond
blocks until downstreamrequest
s - If a
Proxy
terminates, it terminates everyProxy
composed with it
Several of the utilities in Control.Proxy.Prelude.Base use these
equational laws to rigorously prove things about their behavior. For
example, consider the mapD
proxy, which applies a function f
to all
values flowing downstream:
mapD :: (Monad m, Proxy p) => (a -> b) -> x -> p x a x b m r mapD f = runIdentityK loop where loop x = do a <- request x x2 <- respond (f a) loop x2 -- or: mapD f = runIdentityK $ foreverK $ request >=> respond . f
We can use the Proxy
laws to prove that:
mapD f >-> mapD g = mapD (g . f) mapD id = idT
... which is what we expect. We can fuse two consecutive mapD
s into one
by composing their functions, and mapping id
does nothing at all, just
like the identity proxy: idT
.
In fact, these are just the functor laws in disguise, where mapD
defines a
functor between the category of Haskell function composition and the
category of Proxy
composition. Control.Proxy.Prelude.Base is full of
utilities like this that are simultaneously practical and theoretically
elegant.
Mixing Base Monads
Composition can't interleave two proxies if their base monads do not
match. For instance, I might try to modify promptInt
to use
EitherT String
to report the error instead of using exceptions:
import Control.Monad.Trans.Either -- from the "either" package import Safe (readMay) -- from the "safe" package promptInt2 :: (Proxy p) => () -> Producer p Int (EitherT String IO) r promptInt2 () = runIdentityP $ forever $ do str <- lift $ lift $ do putStrLn "Enter an Integer:" getLine case readMay str of Nothing -> lift $ left "Could not read an Integer" Just n -> respond n
However, if I try to compose it with printer
, I receive a type error:
>>>
runEitherT $ runProxy $ promptInt2 >-> printer
<interactive>:2:40: Couldn't match expected type `EitherT String IO' with actual type `IO' ...
The type error says that promptInt2
uses (EitherT String IO)
for its
base monad, but printer
uses IO
for its base monad, so composition can't
interleave their effects.
You can easily fix this using the hoist
function from the mmorph
package, which transforms the base monad of any monad transformer that
implements MFunctor
. Since all proxies implement MFunctor
you can use
hoist
from MFunctor
to lift
one proxy's base monad to match another
proxy's base monad, like so:
>>>
runEitherT $ runProxy $ promptInt2 >-> (hoist lift . printer)
Enter an Integer: Hello<Enter> Left "Could not read an Integer"
This library provides three syntactic conveniences for making this easier to write.
First, (.
) has higher precedence than (>->
), so you can drop the
parentheses:
>>>
runEitherT $ runProxy $ promptInt2 >-> hoist lift . printer
...
Second, lift
is such a common argument to hoist
that I provide the
raise
function:
raise = hoist lift
>>>
runEitherT $ runProxy $ promptInt2 >-> raise . printer
...
Third, I provide the hoistK
and raiseK
functions in case you think
composition looks ugly:
hoistK f = (hoist f .) raiseK = (raise .)
>>>
runEitherT $ runProxy $ promptInt2 >-> raiseK printer
...
For more information on using MFunctor
, consult the tutorial in the
Control.Monad.Morph
module from the mmorph
package.
Utilities
The Control.Proxy.Prelude heirarchy provides several utility functions for common tasks. We can redefine the previous example functions just by composing these utilities.
For example, readLnS
reads values from user input:
readLnS :: (Proxy p, Read a) => () -> Producer p a IO r
... so we can read Int
s just by specializing its type:
readIntS :: (Proxy p) => () -> Producer p Int IO r readIntS = readLnS
The S
suffix indicates that it belongs in the 'S
'erver position.
(takeB_ n)
allows at most n
values to pass through it in 'B
'oth
directions:
takeB_ :: (Monad m, Proxy p) => Int -> a' -> p a' a a' a m ()
takeB_
has a more general type than take'
because it allows any type of
value to flow upstream.
printD
prints all values flowing 'D
'ownstream:
printD :: (Proxy p, Show a) => x -> p x a x a IO r
printD
has a more general type than our original printer
because it
forwards all values further downstream after print
ing them. This means
that you could use it as an intermediate stage as well. However, printD
still type-checks as the most downstream stage, too, since runProxy
just
discards any unused outbound values.
These utilities do not clash with the Prelude namespace or common libraries because they all end with a capital letter suffix that indicates their directionality:
- '
D
' suffix: interacts with values flowing 'D
'ownstream - '
U
' suffix: interacts with values flowing 'U
'pstream - '
B
' suffix: interacts with values flowing 'B
'oth ways (or: 'B
'idirectional) - '
S
' suffix: belongs furthest upstream in the 'S
'erver position - '
C
' suffix: belongs furthest downstream in the 'C
'lient position
We can assemble these functions into a silent version of our previous
Session
:
>>>
runProxy $ readIntS >-> takeB_ 2 >-> printD
4<Enter> 4 39<Enter> 39
Fortunately, we don't have to give up our previous useful diagnostics.
We can use execU
, which executes an action each time values flow upstream
through it, and execD
, which executes an action each time values flow
downstream through it:
promptInt :: (Proxy p) => () -> Producer p Int IO r promptInt = readLnS >-> execU (putStrLn "Enter an Integer:") printer :: (Proxy p, Show a) => x -> p x a x a IO r printer = execD (putStrLn "Received a value:") >-> printD
Similarly, we can build our old take'
on top of takeB_
:
take' :: (Proxy p) => Int -> a' -> p a' a a' a IO () take' n a' = runIdentityP $ do -- Remember, we need 'runIdentityP' if takeB_ n a' -- we use 'do' notation or 'lift' lift $ putStrLn "You shall not pass!"
>>>
runProxy $ promptInt >-> take' 2 >-> printer
<Exact same behavior>
Or perhaps I want to skip user input for testing and mock promptInt
by
replacing it with a predefined set of values:
>>>
runProxy $ fromListS [4, 37, 1] >-> take' 2 >-> printer
Received a value: 4 Received a value: 37
What about our original lines'
function? That's just stdinS
:
stdinS :: (Proxy p) => () -> Producer p String IO ()
You could hand-write loops that accomplish these same tasks, but proxies let you:
- Rapidly swap in and out components for testing, debugging, and fast prototyping
- Factor out common patterns into modular components
- Mix and match simple stages to build sophisticated programs
This compositional programming style emphasizes building a library of reusable components and connecting them like Unix pipes to assemble the desired streaming program.
Mix Monads and Composition
Composition isn't the only way to assemble proxies. You can also sequence
predefined proxies using do
notation to generate more elaborate behaviors.
Most commonly, you will sequence sources to combine their outputs, very
similar to how the Unix cat
utility behaves:
threeSources () = do source1 () source2 () source3 () -- or: threeSources = source1 >=> source2 >=> source3
As a concrete example, we could create a Producer
where our first source
presets the first few values and then we let the user take over to generate
the remaining values:
source1 :: (Proxy p) => () -> Producer p Int IO r source1 () = runIdentityP $ do fromListS [4, 4] () -- Source 1 readLnS () -- Source 2 -- or: source1 = runIdentityK (fromListS [4, 4] >=> readLnS)
>>>
runProxy $ source1 >-> printD
4 4 70<Enter> 70 34<Enter> 34 ...
What if we only want the user to provide three values? We can
selectively throttle it with takeB_
:
source2 :: (Proxy p) => () -> Producer p Int IO () source2 () = runIdentityP $ do fromListS [4, 4] () (readLnS >-> takeB_ 3) () -- You can compose inside a do block! -- or: source2 = runIdentityK (fromListS [4, 4] >=> (readLnS >-> takeB_ 3))
Notice that composition works inside of a do
block! This is a very handy
trick!
>>>
runProxy $ source2 >-> printD
4 4 56<Enter> 56 41<Enter> 41 80<Enter> 80
You can also concatenate sinks, too:
sink1 :: (Proxy p) => () -> Pipe p Int Int IO () sink1 () = runIdentityP $ do (takeB_ 3 >-> printD) () -- Sink 1 (takeWhileD (< 4) >-> printD) () -- Sink 2 -- or: sink1 = (takeB_ 3 >-> printD) >=> (takeWhileD (< 4) >-> printD)
>>>
runProxy $ source2 >-> sink1
4 -- The first sink 4 -- handles these 68<Enter> -- 68 1<Enter> -- The second sink 1 -- handles these 5<Enter> --
... but the above example is gratuitous because you can simply concatenate the intermediate stages:
sink2 :: (Proxy p) => () -> Pipe p Int Int IO () sink2 = intermediate >-> printD where intermediate () = runIdentityP $ do takeB_ 3 () -- Intermediate stage 1 takeWhileD (< 4) () -- Intermediate stage 2 -- or: sink2 = (takeB_ 3 >=> takeWhileD (< 4)) >-> printD
>>>
runProxy $ source2 >-> sink2
<Exact same behavior>
These examples demonstrate the two principal ways to combine proxies:
- "Vertical" composition, using (
>=>
) from the Kleisli category - "Horizontal" composition: using (
>->
) from the Proxy category
You assemble most proxies simply by composing them in one or both of these two categories.
ListT
Proxies generalize lists by allowing you to interleave effects between list
elements, but you might be surprised to learn that they also generalize the
list monad, too! Control.Proxy.ListT provides the machinery necessary to
convert back and forth between proxies and a ListT
-like monad transformer
that binds proxy outputs.
For example, let's say that we want to select elements from two separate lists, except interleaving side effects, too:
-- +-- ListT that will compile to a 'Producer' -- | -- v pairs :: (ListT p) => () -> ProduceT p IO (Int, Int) pairs () = do x <- rangeS 1 3 -- Select a number betwen 1 and 3 lift $ putStrLn $ "Currently using: " ++ show x y <- eachS [4,6,8] -- Select one of 4, 6, or 8 return (x, y)
We can compile the above ProduceT
code to a Producer
using
runRespondK
:
-- runRespondK's type is actually more general runRespondK :: (() -> ProduceT p m b) -> () -> Producer p b m () runRespondK pairs :: (ListT p) => () -> Producer p (Int, Int) IO ()
The return value of the ProduceT
becomes the output of the corresponding
Producer
, which produces one output for each permutation of elements that
we could have selected:
>>>
runProxy $ runRespondK pairs >-> printD
Currently using: 1 (1,4) (1,6) (1,8) Currently using: 2 (2,4) (2,6) (2,8) Currently using: 3 (3,4) (3,6) (3,8)
This works the other way around, too! You can wrap any Producer
in the
RespondT
newtype to bind its output in the ProduceT
monad:
pairs2 :: (ListT p) => () -> ProduceT p IO (Int, Int) pairs2 () = do x <- RespondT $ runIdentityP $ do respond 1 lift $ putStrLn "Here" respond 2 y <- RespondT $ runIdentityP $ do respond 3 lift $ putStrLn "There" respond 4 return (x, y)
>>>
runProxy $ runRespondK pairs2 >-> printD
(1,3) There (1,4) Here (2,3) There (2,4)
In fact, this is how eachS
and rangeS
are implemented:
eachS :: (Monad m, ListT p) => [b] -> ProduceT p m b eachS xs = RespondT (fromList xs ()) rangeS :: (Enum b, Monad m, Ord b, ListT p) => b -> b -> ProduceT p m b rangeS n1 n2 = RespondT (enumFromS n1 n2 ())
ProduceT
is actually a special case of RespondT
, related by the
following type synonym:
type ProduceT p = RespondT p C () ()
Moreover, you can bind anything within RespondT
, not just Producer
s.
For example, you can bind Pipe
s within the more general RespondT
monad:
pairs3 :: (ListT p) => () -> RespondT p () Int () IO (Int, Int) pairs3 () = do x <- RespondT $ runIdentityP $ replicateM_ 2 $ do a <- request () lift $ putStrLn $ "Received " ++ show a respond a y <- RespondT $ runIdentityP $ replicateM_ 3 $ do a <- request () lift $ putStrLn $ "Received " ++ show a respond a return (x, y)
... and you will get a Pipe
back when you runRespondK
the final result:
runRespondK pairs3 :: ListT p => () -> Pipe p Int (Int, Int) IO ()
>>>
runProxy $ enumFromS 1 >-> runRespondK pairs3 >-> printD
Received 1 Received 2 (1,2) Received 3 (1,3) Received 4 (1,4) Received 5 Received 6 (5,6) Received 7 (5,7) Received 8 (5,8)
Proxies actually form two symmetric ListT
-like monad transformers: one
binds elements output from the proxy's downstream interface and one binds
elements output from the proxy's upstream interface. To distinguish them,
I call the downstream one RespondT
and the upstream one RequestT
.
Control.Proxy.ListT contains the ListT
class, which provides the the
underlying operators for both RespondT
and RequestT
:
You don't need to use these operators directly, since you can instead just
wrap your proxies in the RespondT
or RequestT
monad transformers and
they will translate the binds in those monads to the above operators under
the hood.
If those are the bind operators, though, then where are the corresponding
return
equivalents? Why, they are just respond
and request
!
-- return for 'RespondT' return b = RespondT (respond b)
-- return for 'RequestT' return a' = RequestT (request a')
Therefore, we can use the monad laws to reason about how respond
interacts
with RespondT
(and, dually, how request
interacts with RequestT
):
do x' <- RespondT (respond x) = do f x f x' do x <- m = do m RespondT (respond x)
RequestT
and RespondT
are correct by construction, meaning that they
always satisfy the monad and monad transformer without exception, unlike
ListT
from transformers
. In other words, they behave like two
symmetric implementations of "ListT done right".
Folds
You can fold a stream of values in two ways, both of which use the base monad:
- Use
WriterT
in the base monad andtell
the values to fold - Use
StateT
in the base monad andput
strict values
WriterT
is more elegant in principle but leaks space for a large number of
values to fold. StateT
does not leak space if you keep the accumulator
strict, but is less elegant and doesn't guarantee write-only behavior. To
remedy this, I am currently working on a stricter WriterT
implementation
that does not leak space to add to the transformers
package.
Control.Proxy.Prelude.Base provides several common folds using WriterT
as the base monad, such as:
-
lengthD
: Count how many values flow downstream
lengthD :: (Monad m, Proxy p) => x -> p x a x a (WriterT (Sum Int) m) r
-
toListD
: Fold the values flowing downstream into a list.
toListD :: (Monad m, Proxy p) => x -> p x a x a (WriterT [a] m) r
-
anyD
: Determine whether any values satisfy the predicate
anyD :: (Monad m, Proxy p) => (a -> Bool) -> x -> p x a x a (WriterT Any m) r
These WriterT
versions demonstrate how the elegant approach should work in
principle and they should be okay for folding a medium number of values
until I release the fixed WriterT
. If space leaks cause problems, you can
temporarily rewrite the WriterT
folds using the following two strict
StateT
folds:
-
foldlD'
: Strictly fold values flowing downstream
foldlD' :: (Monad m, Proxy p) => (b -> a -> b) -> x -> p x a x a (StateT b m) r
-
foldlU'
: Strictly fold values flowing upstream
foldU' :: (Monad m, Proxy p) => (b -> a' -> b) -> a' -> p a' x a' x (StateT b m) r
Now, let's try these folds out and see if we can build a list from user input:
>>>
runWriterT $ runProxy $ raiseK promptInt >-> takeB_ 3 >-> toListD
Enter an Integer: 1<Enter> Enter an Integer: 66<Enter> Enter an Integer: 5<Enter> ((),[1,66,5])
Notice that promptInt
uses IO
as its base monad, but toListD
uses
(WriterT [Int] m)
as its base monad, so I use raiseK
to get the base
monads to match.
You can insert these folds anywhere in the middle of a pipeline and they still work:
>>>
runWriterT $ runProxy $ fromListS [5, 7, 4] >-> lengthD >-> raiseK printD
5 7 4 ((),Sum {getSum = 3})
You can also run multiple folds at the same time just by adding more
WriterT
layers to your base monad:
>>>
runWriterT $ runWriterT $ runProxy $ fromListS [9, 10] >-> anyD even >-> raiseK sumD
(((),Any {getAny = True}),Sum {getSum = 19}
I designed certain special folds to terminate the Session
early if they
can compute their result prematurely, in order to draw as little input as
possible. These folds end with an underscore, such as headD_
, which
terminates the stream once it receives an input:
headD_ :: (Monad m, Proxy p) => x -> p x a x a (WriterT (First a) m) ()
>>>
runWriterT $ runProxy $ fromListS [3, 4, 9] >-> raiseK printD >-> headD_
3 ((),First {getFirst = Just 3})
Compare this to headD
without underscore, which folds the entire input:
>>>
runWriterT $ runProxy $ fromListS [3, 4, 9] >-> raiseK printD >-> headD
3 4 9 ((),First {getFirst = Just 3})
Use the versions that don't prematurely terminate if you are running multiple folds or if you want to continue to use the rest of the input when the fold is done. Use the versions that do prematurely terminate if collecting that single fold is the entire purpose of the session.
Resource Management
This core library provides utilities for lazily streaming from resources,
but does not provide utilities for lazily managing resource allocation and
deallocation. To frame the problem, let's assume that we try to be clever
and write a streaming utility that lazily opens a file only in response to
a request
, such as the following Producer
:
readFileS :: () -> FilePath -> () -> Producer p String IO () readFileS file () = runIdentityP $ do h <- lift $ openFile file ReadMode lift $ putStrLn "Opening file" hGetLineS h () lift $ putStrLn "Closing file" lift $ hClose h
This works well if we fully demand the file:
>>>
runProxy $ readFileS "test.txt" >-> printD
Opening file "Line 1" "Line 2" "Line 3" Closing file
This also works well if we never demand the file at all, in which case we never open it:
>>>
runProxy $ readFileS "test.txt" >-> return
-- Outputs nothing
But it gives exactly the wrong behavior if we partially demand the file:
>>>
runProxy $ readFileS "test.txt" >-> takeB_ 1 >-> printD
Opening file "Line 1"
Notice that this does not close the file, because once takeB_ 1
terminates
it terminates the entire Session
and readFileS
does not get a chance to
finalize the file.
The pipes-safe
library solves this problem by providing resource
management abstractions built on top of pipes
and offers several other
nice features:
- It is completely exception safe, even against asynchronous exceptions
- It is backwards compatible with "unmanaged" ordinary proxies
Backwards compatibility means that you don't need to buy in to the
pipes-safe
way of doing things. This matters because another common
approach is to just open all resources before running the session and close
them all afterward. For example,, if I wanted to emulate the Unix cp
command, streaming one line at a time, I might write:
cp :: FilePath -> FilePath -> IO () cp inFile outFile = withFile inFile ReadMode $ \hIn -> withFile outFile WriteMode $ \hOut -> runProxy $ hGetLineS hIn >-> hPutStrLnD hOut
Some people prefer that approach because it:
- is straightforward, and
- can reuse functions from
Control.Exception
The disadvantage is that this does not lazily allocate resources, nor does
this promptly deallocate them. Also, there is no way to recover from
exceptions and resume the Session
. On the other hand, pipes-safe
lets
you do all of these.
Fortunately, you can choose whichever approach you prefer and rest assured
that the two approaches safely interoperate. Control.Proxy.Safe.Tutorial
from the pipes-safe
package provides a separate tutorial on how to:
- extend
pipes
with resource management, - handle exceptions natively within proxies, and
- interoperate with unmanaged code.
Extensions
This library provides several extensions that add features on top of the
base Proxy
API. These extensions behave like monad transformers, except
that they also lift the Proxy
class through the extension so that the
extended proxy can still request
, respond
, and compose with other
proxies:
instance (Proxy p) => Proxy (IdentityP p) -- Equivalent to IdentityT instance (Proxy p) => Proxy (EitherP e p) -- Equivalent to EitherT instance (Proxy p) => Proxy (MaybeP p) -- Equivalent to MaybeT instance (Proxy p) => Proxy (StateP s p) -- Equivalent to StateT instance (Proxy p) => Proxy (WriterP w p) -- Equivalent to WriterT
Each of these proxy transformers provides the same API as the equivalent monad transformer (sometimes even more). The following sections show some common problems that these proxy transformers solve.
Error handling
Our previous promptInt
example suffered from one major flaw:
promptInt2 :: (Proxy p) => () -> Producer p Int (EitherT String IO) r promptInt2 () = runIdentityP $ forever $ do str <- lift $ lift $ do putStrLn "Enter an Integer:" getLine case readMay str of Nothing -> lift $ left "Could not read an Integer" Just n -> respond n
There is no way to recover from the error and resume streaming data. You
can only handle the Left
value after using runProxy
, but by then it is
too late.
We can solve this by switching the order of the two monad transformers, but
using EitherP
this time instead of EitherT
:
import qualified Control.Proxy.Trans.Either as E -- Proxy transformers play -- nice with type synonyms --+ -- | -- v promptInt3 :: (Proxy p) => () -> Producer (E.EitherP String p) Int IO r -- i.e. (Proxy p) => () -> EitherP String p C () () Int IO r promptInt3 () = forever $ do str <- lift $ do putStrLn "Enter an Integer:" getLine case readMay str of Nothing -> E.throw "Could not read an Integer" Just n -> respond n
This example does not need runIdentityP
(nor would that type-check)
because the EitherP
proxy transformer gives the compiler enough
information to generalize the constraints.
We've swapped the order of the transformers, so now we use runEitherK
first to unwrap the EitherP
followed by runProxy
.
runEitherK :: (q -> EitherP p a' a b' b m r) -> (q -> p a' a b' b m (Either e r))
>>>
runProxy $ E.runEitherK $ promptInt3 >-> printer :: IO (Either String r)
Enter an Integer: Hello<Enter> Left "Could not read an Integer"
Notice how we can directly compose printer
with promptInt
.
This works because printer
's base proxy type is completely polymorphic
over the Proxy
type class and doesn't use any features specific to any
proxy transformers:
'p' type-checks as anything --+ that implements 'Proxy' | v printer :: (Proxy p, Show a) => () -> Consumer p a IO r
This means that you can compose printer
with anything that implements the
Proxy
type class, including EitherP
, without any lifting.
EitherP
lets us catch and handle errors locally without disturbing other
proxies. For example, I can define a heartbeat function that just restarts
a given proxy each time it raises an error:
heartbeat :: (Proxy p) => E.EitherP String p a' a b' b IO r -> E.EitherP String p a' a b' b IO r heartbeat p = p `E.catch` \err -> do lift $ putStrLn err -- Print the error heartbeat p -- Restart 'p'
This uses the catch
function from Control.Proxy.Trans.Either, which
lets you catch and handle errors locally without disturbing other proxies.
>>>
runProxy $ E.runEitherK $ (heartbeat . promptInt3) >-> takeB_ 2 >-> printer
Enter an Integer: Hello<Enter> Could not read an Integer Enter an Integer 8 Received a value: 8 Enter an Integer 0 Received a value: 0
It's very easy to prove that EitherP
has only a local effect. In fact,
we can run it locally on a subset of the pipeline like so:
>>>
runProxy $ (E.runEitherK $ heartbeat . promptInt3 >-> takeB_ 2) >-> printer
Proxy transformers do not use the base monad at all, so you can use them to isolate effects from other proxies, as the next section demonstrates.
Local state
The StateP
proxy lets you embed local state into any Proxy
computation.
For example, we might want to gratuitously use state to generate successive
numbers:
import qualified Control.Proxy.Trans.State as S increment :: (Monad m, Proxy p) => () -> Producer (S.StateP Int p) Int m r increment () = forever $ do n <- S.get respond n S.put (n + 1)
We could then embed it locally into any Proxy
, such as the following one:
numbers :: (Monad m, Proxy p) => () -> Producer p Int m () numbers () = runIdentityP $ do (takeB_ 5 <-< S.evalStateK 10 increment) () S.evalStateK 1 (takeB_ 3 <-< increment) () -- This works, too!
>>>
runProxy $ numbers >-> printD
10 11 12 13 14 1 2 3
We can also prove the effect is local even when you directly compose two
StateP
proxies before running them. Let's define a stateful consumer:
increment2 :: (Proxy p) => () -> Consumer (S.StateP Int p) Int IO r increment2 () = forever $ do nOurs <- S.get nTheirs <- request () lift $ print (nTheirs, nOurs) S.put (nOurs + 2)
.. and hook it up directly to increment
:
>>>
runProxy $ S.evalStateK 0 $ increment >-> takeB_ 3 >-> increment2
(0, 0) (1, 2) (2, 4)
They each share the same initial state, but they isolate their own side effects completely from each other.
Branching, zips, and merges
So far we've only considered linear chains of proxies, but pipes
allows
you to branch these chains and generate more sophisticated topologies. The
trick is to simply nest the Proxy
monad transformer within itself.
For example, if I want to zip two inputs, I can just define the following triply nested proxy:
zipD :: (Monad m, Proxy p1, Proxy p2, Proxy p3) => () -> Consumer p1 a (Consumer p2 b (Producer p3 (a, b) m)) r zipD () = runIdentityP . hoist (runIdentityP . hoist runIdentityP) $ forever $ do -- Yes, this 'runIdentityP' mess is necessary. Sorry! a <- request () -- Request from the outer 'Consumer' b <- lift $ request () -- Request from the inner 'Consumer' lift $ lift $ respond (a, b) -- Respond to the 'Producer'
zipD
behaves analogously to a curried function. We partially apply it to
each layer using composition and runProxyK
or runProxy
:
-- 1st application p1 = runProxyK $ zipD <-< fromListS [1..3] -- 2nd application p2 = runProxyK $ p1 <-< fromListS [4..] -- 3rd application p3 = runProxy $ printD <-< p2
>>>
p3
(1,4) (2,5) (3,6)
You can use this trick to fork output, too:
fork :: (Monad m, Proxy p1, Proxy p2, Proxy p3) => () -> Consumer p1 a (Producer p2 a (Producer p3 a m)) r fork () = runIdentityP . hoist (runIdentityP . hoist runIdentityP) $ forever $ do a <- request () -- Request output from the 'Consumer' lift $ respond a -- Send output to the outer 'Producer' lift $ lift $ respond a -- Send output to the inner 'Producer'
Again, we just keep partially applying it until it is fully applied:
-- 1st application p1 = runProxyK $ fork <-< fromListS [1..3] -- 2nd application p2 = runProxyK $ raiseK printD <-< mapD (> 2) <-< p1 -- 3rd application p3 = runProxy $ printD <-< mapD show <-< p2
>>>
p3
False "1" False "2" True "3"
You can even merge or fork proxies that use entirely different feature sets:
p1 = runProxyK $ S.evalStateK 0 $ fork <-< increment p2 = runProxyK $ raiseK printD <-< mapD (+ 10) <-< p1 p3 = runProxy $ E.runEitherK $ printD <-< (takeB_ 3 >=> E.throw) <-< p2
>>>
p3
10 0 11 1 12 2 Left ()
We just forked a (StateP p1)
proxy and read out the result in both a
generic p2
proxy and an (EitherP p3)
proxy. That's pretty crazy, but it
gives you a sense of how versatile and robust proxies can be.
You can implement arbitrary branching topologies using this trick. However, I want to mention a few caveats:
- The intermediate partially applied type signatures will be ugly as sin. I warned you.
- You cannot implement cyclic topologies (and cyclic topologies do not make sense for proxies anyway)
- You cannot use this trick to implement a polymorphic zip function of the following form:
zip' -- You can't define this :: (Monad m, Proxy p) => (() -> Producer p a m r) -> (() -> Producer p b m r) -> (() -> Producer p (a, b) m r)
Partial application requires selecting a Proxy
instance, which is why you
cannot define zip'
. You can define a zip'
specialized to a concrete
Proxy
instance, but I don't really recommend doing that since you should
always strive to write polymorphic proxies to avoid locking your user into
a particular feature set.
With those caveats out of the way, this approach affords many indispensable features that other approaches do not allow:
- It does not require extending the
Proxy
type class - It handles almost every branching scenario, including more complicated situations like concurrent interleavings
- You can branch and merge mixtures of
Server
s,Client
s, andProxy
s - You can branch and merge heterogeneous feature sets
- It is completely polymorphic over the
Proxy
class and uses no implementation-specific details
Proxy Transformers
There is one last scenario that you will eventually encounter: mixing
proxies that have incompatible proxy transformer stacks. You solve this the
exact same way you mix different monad transformer stacks, except that
instead of using lift
and hoist
you use liftP
and hoistP
.
For example, we might want to mix promptInt3
and increment2
:
promptInt3 :: (Proxy p) => () -> Producer (EitherP String p) Int IO r increment2 :: (Proxy p) => () -> Consumer (StateP Int p) Int IO r
Unfortunately, they use two different feature sets so neither one is fully
polymorphic over the Proxy
class and we cannot directly compose them.
Fortunately, all proxy transformers implement the ProxyTrans
class,
analogous to the MonadTrans
class for transformers:
class ProxyTrans t where liftP :: (Monad m, Proxy p) => p a' a b' b m r -> t p a' a b' b m r -- mapP is slightly more elegant mapP :: (Monad m, Proxy p, ProxyTrans t) => (q -> p a' a b' b m r) -> (q -> t p a' a b' b m r) mapP = (liftP .)
It's very easy to use. Just use mapP
to lift one proxy transformer to
match another one. For example, we can mapP
increment2
to match
promptInt3
:
promptInt3 :: (Proxy stateP) => () -> Producer (EitherP String stateP ) Int IO r mapP increment2 :: (Proxy p, ProxyTrans eitherP) => () -> Consumer (eitherP (StateP Int p)) Int IO r promptInt3 >-> mapP increment2 :: (Proxy p) => () -> Session (EitherP String (StateP Int p)) IO r
mapP
creates a new ProxyTrans
layer that type-checks as EitherP
, and
StateP Int p
type-checks as the Proxy
in promptInt3
's signature.
>>>
runProxy $ S.evalStateK 0 $ E.runEitherK $ promptInt3 >-> mapP increment2
Enter an Integer: 4<Enter> (4, 0) Enter an Integer: 5<Enter> (5, 2) Enter an Integer: Hello<Enter> Left "Could not read an Integer"
... or we could instead mapP
promptInt3
to match increment2
and switch
the order of the two proxy transformers:
mapP promptInt3 :: (Proxy p, ProxyTrans stateP) => () -> Producer (stateP (EitherP String p)) Int IO r increment2 :: (Proxy eitherP) => () -> Consumer (StateP Int eitherP ) Int IO r mapP promptInt3 >-> increment2 :: (Proxy p) => () -> Session (StateP Int (EitherP String p)) IO r
mapP
creates a new ProxyTrans
layer that type-checks as StateP
, and
EitherP Int p
type-checks as the Proxy
in increment2
's signature.
>>>
runProxy $ E.runEitherK $ S.evalStateK 0 $ mapP promptInt3 >-> increment2
Enter an Integer: 4<Enter> (4, 0) Enter an Integer: 5<Enter> (5, 2) Enter an Integer: Hello<Enter> Left "Could not read an Integer"
Like monad transformers, proxy transformers lift a base Monad
instance
to an extended Monad
instance and liftP
exactly mirrors the lift
function from MonadTrans
. liftP
takes some base proxy, p
, that
implements Monad
and "lift"s it to an extended proxy, (t p)
, that also
implements Monad
.
This means you can seamlessly mix effects from different proxy transformer
layers just by using liftP
to access inner layers:
twoLayers :: (Proxy p) => () -> Consumer (E.EitherP String (S.StateP Int p)) Int IO r twoLayers () = forever $ do a <- request () if (a >= 0) then liftP $ S.modify (+ a) else E.throw "Negative number"
>>>
runProxy $ S.runStateK 0 $ E.runEitherK $ fromListS [1, 2, -4] >-> twoLayers
(Left "Negative number",3)
This exactly resembles how you use lift
to access inner layers of a monad
transformer stack.
Proxy Morphism Laws
Monad transformers impose certain laws to ensure that lift
behaves
correctly. These are known as the "monad morphism laws":
(lift .) (f >=> g) = (lift .) f >=> (lift .) g (lift .) return = return
If you convert these laws to do
notation, they just say:
do x <- lift m = lift $ do x <- m lift (f x) f x lift (return r) = return r
Proxy transformers require the exact same laws to ensure that they lift the
base monad to the extended monad correctly. Just replace lift
with
liftP
:
(liftP .) (f >=> g) = (liftP .) f >=> (liftP .) g (liftP .) return = return
The only difference is mapP
sweetens these laws a little bit:
mapP = (liftP .) mapP (f >=> g) = mapP f >=> mapP g -- These are functor laws! mapP return = return
However, proxy transformers do one extra thing above and beyond ordinary
monad transformers. Proxy transformers lift the Proxy
type class, meaning
that if the base type implements Proxy
, so does the extended type.
This means that we need a set of laws that guarantee that the proxy
transformer lifts the Proxy
instance correctly. I call these laws the
"proxy morphism laws":
mapP (f >-> g) = mapP f >-> mapP g -- These are functor laws, too! mapP idT = idT
In other words, a proxy transformer defines a functor from the base composition to the extended composition! Neat!
But we're not even done, because proxies actually form three other categories, only one of which I have mentioned so far, and proxy transformers lift these three other categories correctly, too:
-- The push-based category mapP (f >~> g) = mapP f >~> mapP g mapP coidT = coidT
-- The upstream ListT Kleisli category mapP (f \>\ g) = mapP f \>\ mapP g mapP request = request
-- The downstream ListT Kleisli category mapP (f />/ g) = mapP f />/ mapP g mapP respond = respond
I want to highlight two of the above laws:
mapP request = request mapP respond = respond
We can translate those back to liftP
to get:
liftP (request a') = request a' liftP (respond b) = respond b
In other words, request
and respond
in the extended proxy must behave
exactly the same as lifting request
and respond
from the base proxy.
All the proxy transformers in this library obey the proxy morphism laws,
which ensure that liftP
/ mapP
always do "the right thing".
Functors on Proxies
Proxy transformers also implement hoistP
from the PFunctor
class in
Control.Proxy.Morph. This exactly parallels hoist
from
Control.Monad.Morph
.
You will most commonly use hoistP
to insert arbitrary proxy transformer
layers to get two mismatched proxy transformer stacks to type-check.
For example, consider the following two very different proxy transformer stacks:
p1 :: (Monad m, Proxy p) => a' -> StateP s (ReaderP i p) a' a a' a m a' p2 :: (Monad m, Proxy p) => a' -> MaybeP (WriterP w p) a' a a' a m a'
I can normalize them to use same proxy transformer stack by judiciously
inserting extra proxy transformer layers using a combination of hoistP
and liftP
:
p1' :: (Monad m, Proxy p) => a' -> StateP s (MaybeP (ReaderP i (WriterP w p))) a' a a' a m a' p1' = hoistP liftP . p1 p2' :: (Monad m, Proxy p) => a' -> StateP s (MaybeP (ReaderP i (WriterP w p))) a' a a' a m a' p2' = liftP . hoistP liftP . p2
Now that I've made them agree on a common proxy transformer stack, I can sequence them or compose them:
pSequence :: (Proxy p) => a' -> StateP s (MaybeP (ReaderP i (WriterP w p))) a' a a' a m a' pSequence = p1' >=> p2' pCompose :: (Proxy p) => a' -> StateP s (MaybeP (ReaderP i (WriterP w p))) a' a a' a m a' pCompose = p1' >-> p2'
Conclusion
The pipes
library emphasizes the reuse of a small set of core abstractions
grounded in theory to implement all functionality:
- Monads
- Proxies: (
>->
),request
, andrespond
- Monad Transformers and Functors on Monads:
lift
andhoist
- Proxy Transformers and Functors on Proxies:
liftP
andhoistP
However, I don't expect everybody to immediately understand how so few
primitives can implement such a wide variety of features. This tutorial
gives a taste of how many interesting ways you can combine these few
abstractions, but these examples barely scratch the surface, despite this
tutorial's length. So if you don't know how to implement something using
pipes
, just ask me and I will be happy to help.
Appendix
I've provided the full code for the above examples here so you can easily play with the examples yourself:
import Control.Monad import Control.Proxy hiding (zipD) import System.IO import qualified Data.Map as M import Control.Monad.Trans.Either import Safe (readMay) import qualified Control.Proxy.Trans.Either as E import qualified Control.Proxy.Trans.State as S lines' :: (Proxy p) => Handle -> () -> Producer p String IO () lines' h () = runIdentityP loop where loop = do eof <- lift $ hIsEOF h if eof then return () else do str <- lift $ hGetLine h respond str -- Produce the string loop promptInt :: (Proxy p) => () -> Producer p Int IO r promptInt () = runIdentityP $ forever $ do lift $ putStrLn "Enter an Integer:" n <- lift readLn -- 'lift' invokes an action in the base monad respond n {- promptInt = readLnS >-> execU (putStrLn "Enter an Integer:") -} printer :: (Proxy p, Show a) => () -> Consumer p a IO r printer () = runIdentityP $ forever $ do a <- request () -- Consume a value lift $ putStrLn "Received a value:" lift $ print a {- printer :: (Proxy p, Show a) => x -> p x a x a IO r printer = execD (putStrLn "Received a value:") >-> printD -} take' :: (Proxy p) => Int -> () -> Pipe p a a IO () take' n a' = runIdentityP $ do -- Remember, we need 'runIdentityP' if takeB_ n a' -- we use 'do' notation or 'lift' lift $ putStrLn "You shall not pass!" threeReqs :: (Proxy p) => () -> Client p Int Bool IO () threeReqs () = runIdentityP $ forM_ [1, 3, 1] $ \argument -> do lift $ putStrLn $ "Client Sends: " ++ show (argument :: Int) result <- request argument lift $ putStrLn $ "Client Receives: " ++ show (result :: Bool) lift $ putStrLn "*" comparer :: (Proxy p) => Int -> Server p Int Bool IO r comparer = runIdentityK $ foreverK $ \argument -> do lift $ putStrLn $ "Server Receives: " ++ show argument let result = argument > 2 lift $ putStrLn $ "Server Sends: " ++ show result respond result cache :: (Proxy p, Ord key) => key -> p key val key val IO r cache = runIdentityK (loop M.empty) where loop _map key = case M.lookup key _map of Nothing -> do val <- request key key2 <- respond val loop (M.insert key val _map) key2 Just val -> do lift $ putStrLn "Used cache!" key2 <- respond val loop _map key2 downstream :: (Proxy p) => () -> Consumer p () IO () downstream () = runIdentityP $ do lift $ print 1 request () -- Switch to upstream lift $ print 3 request () -- Switch to upstream upstream :: (Proxy p) => () -> Producer p () IO () upstream () = runIdentityP $ do lift $ print 2 respond () -- Switch to downstream lift $ print 4 promptInt2 :: (Proxy p) => () -> Producer p Int (EitherT String IO) r promptInt2 () = runIdentityP $ forever $ do str <- lift $ lift $ do putStrLn "Enter an Integer:" getLine case readMay str of Nothing -> lift $ left "Could not read an Integer" Just n -> respond n readIntS :: (Proxy p) => () -> Producer p Int IO r readIntS = readLnS source1 :: (Proxy p) => () -> Producer p Int IO r source1 () = runIdentityP $ do fromListS [4, 4] () -- Source 1 readLnS () -- Source 2 source2 :: (Proxy p) => () -> Producer p Int IO () source2 () = runIdentityP $ do fromListS [4, 4] () (readLnS >-> takeB_ 3) () -- You can compose inside a do block! sink1 :: (Proxy p) => () -> Pipe p Int Int IO () sink1 () = runIdentityP $ do (takeB_ 3 >-> printD) () -- Sink 1 (takeWhileD (< 4) >-> printD) () -- Sink 2 sink2 :: (Proxy p) => () -> Pipe p Int Int IO () sink2 = intermediate >-> printD where intermediate () = runIdentityP $ do takeB_ 3 () -- Intermediate stage 1 takeWhileD (< 4) () -- Intermediate stage 2 pairs :: (ListT p) => () -> ProduceT p IO (Int, Int) pairs () = do x <- rangeS 1 3 -- Select a number betwen 1 and 3 lift $ putStrLn $ "Currently using: " ++ show x y <- eachS [4,6,8] -- Select one of 4, 6, or 8 return (x, y) pairs2 :: (ListT p) => () -> ProduceT p IO (Int, Int) pairs2 () = do x <- RespondT $ runIdentityP $ do respond 1 lift $ putStrLn "Here" respond 2 y <- RespondT $ runIdentityP $ do respond 3 lift $ putStrLn "There" respond 4 return (x, y) pairs3 :: (ListT p) => () -> RespondT p () Int () IO (Int, Int) pairs3 () = do x <- RespondT $ runIdentityP $ replicateM_ 2 $ do a <- request () lift $ putStrLn $ "Received " ++ show a respond a y <- RespondT $ runIdentityP $ replicateM_ 3 $ do a <- request () lift $ putStrLn $ "Received " ++ show a respond a return (x, y) readFileS :: (Proxy p) => FilePath -> () -> Producer p String IO () readFileS file () = runIdentityP $ do h <- lift $ openFile file ReadMode lift $ putStrLn "Opening file" hGetLineS h () lift $ putStrLn "Closing file" lift $ hClose h cp :: FilePath -> FilePath -> IO () cp inFile outFile = withFile inFile ReadMode $ \hIn -> withFile outFile WriteMode $ \hOut -> runProxy $ hGetLineS hIn >-> hPutStrLnD hOut promptInt3 :: (Proxy p) => () -> Producer (E.EitherP String p) Int IO r promptInt3 () = forever $ do str <- lift $ do putStrLn "Enter an Integer:" getLine case readMay str of Nothing -> E.throw "Could not read an Integer" Just n -> respond n heartbeat :: (Proxy p) => E.EitherP String p a' a b' b IO r -> E.EitherP String p a' a b' b IO r heartbeat p = p `E.catch` \err -> do lift $ putStrLn err -- Print the error heartbeat p -- Restart 'p' increment :: (Monad m, Proxy p) => () -> Producer (S.StateP Int p) Int m r increment () = forever $ do n <- S.get respond n S.put (n + 1) numbers :: (Monad m, Proxy p) => () -> Producer p Int m () numbers () = runIdentityP $ do (takeB_ 5 <-< S.evalStateK 10 increment) () S.evalStateK 1 (takeB_ 3 <-< increment) () -- This works, too! increment2 :: (Proxy p) => () -> Consumer (S.StateP Int p) Int IO r increment2 () = forever $ do nOurs <- S.get nTheirs <- request () lift $ print (nTheirs, nOurs) S.put (nOurs + 2) zipD :: (Monad m, Proxy p1, Proxy p2, Proxy p3) => () -> Consumer p1 a (Consumer p2 b (Producer p3 (a, b) m)) r zipD () = runIdentityP . hoist (runIdentityP . hoist runIdentityP) $ forever $ do -- Yes, this 'runIdentityP' mess is necessary. Sorry! a <- request () -- Request from the outer 'Consumer' b <- lift $ request () -- Request from the inner 'Consumer' lift $ lift $ respond (a, b) -- Respond to the 'Producer' p1 = runProxyK $ zipD <-< fromListS [1..3] p2 = runProxyK $ p1 <-< fromListS [4..] p3 = runProxy $ printD <-< p2 fork :: (Monad m, Proxy p1, Proxy p2, Proxy p3) => () -> Consumer p1 a (Producer p2 a (Producer p3 a m)) r fork () = runIdentityP . hoist (runIdentityP . hoist runIdentityP) $ forever $ do a <- request () -- Request output from the 'Consumer' lift $ respond a -- Send output to the outer 'Producer' lift $ lift $ respond a -- Send output to the inner 'Producer' {- p1 = runProxyK $ fork <-< fromListS [1..3] p2 = runProxyK $ raiseK printD <-< mapD (> 2) <-< p1 p3 = runProxy $ printD <-< mapD show <-< p2 -} {- p1 = runProxyK $ S.evalStateK 0 $ fork <-< increment p2 = runProxyK $ raiseK printD <-< mapD (+ 10) <-< p1 p3 = runProxy $ E.runEitherK $ printD <-< (takeB_ 3 >=> E.throw) <-< p2 -} twoLayers :: (Proxy p) => () -> Consumer (E.EitherP String (S.StateP Int p)) Int IO r twoLayers () = forever $ do a <- request () if (a >= 0) then liftP $ S.modify (+ a) else E.throw "Negative number"