Safe Haskell | Safe |
---|---|
Language | Haskell2010 |
- data PipeF a b x
- type Pipe a b = FreeT (PipeF a b)
- data C
- type Producer b = Pipe () b
- type Consumer b = Pipe b C
- type Pipeline = Pipe () C
- await :: Monad m => Pipe a b m a
- yield :: Monad m => b -> Pipe a b m ()
- pipe :: Monad m => (a -> b) -> Pipe a b m r
- (<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r
- (>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
- idP :: Monad m => Pipe a a m r
- newtype PipeC m r a b = PipeC {}
- runPipe :: Monad m => Pipeline m r -> m r
Types
The Pipe
type is strongly inspired by Mario Blazevic's Coroutine
type in
his concurrency article from Issue 19 of The Monad Reader and is formulated
in the exact same way.
His Coroutine
type is actually a free monad transformer (i.e. FreeT
)
and his InOrOut
functor corresponds to PipeF
.
The base functor for the Pipe
type
type Pipe a b = FreeT (PipeF a b) Source #
The base type for pipes
a
- The type of input received from upstream pipesb
- The type of output delivered to downstream pipesm
- The base monadr
- The type of the return value
Create Pipes
yield
and await
are the only two primitives you need to create pipes.
Since Pipe a b m
is a monad, you can assemble yield
and await
statements using ordinary do
notation. Since Pipe a b
is also a monad
transformer, you can use lift
to invoke the base monad. For example, you
could write a pipe stage that requests permission before forwarding any
output:
check :: (Show a) => Pipe a a IO r check = forever $ do x <- await lift $ putStrLn $ "Can '" ++ (show x) ++ "' pass?" ok <- read <$> lift getLine when ok (yield x)
await :: Monad m => Pipe a b m a Source #
Wait for input from upstream.
await
blocks until input is available from upstream.
pipe :: Monad m => (a -> b) -> Pipe a b m r Source #
Convert a pure function into a pipe
pipe f = forever $ do x <- await yield (f x)
Compose Pipes
Pipe
s form a Category
, meaning that you can compose Pipe
s using
(<+<
) and also define an identity Pipe
: idP
. These satisfy the
category laws:
idP <+< p = p p <+< idP = p (p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)
Pipe
composition binds the output of the upstream Pipe
to the input of
the downstream Pipe
. Like Haskell functions, Pipe
s are lazy, meaning
that upstream Pipe
s are only evaluated as far as necessary to generate
enough input for downstream Pipe
s. If any Pipe
terminates, it also
terminates every Pipe
composed with it.
(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r infixl 9 Source #
Corresponds to (>>>
) from Control.Category
Run Pipes
Note that you can also unwrap a Pipe
a single step at a time using
runFreeT
(since Pipe
is just a type synonym for a free monad
transformer). This will take you to the next external await
or yield
statement. This means that a closed Pipeline
will unwrap to a single
step, in which case you would have been better served by runPipe
.
runPipe :: Monad m => Pipeline m r -> m r Source #
Run the Pipe
monad transformer, converting it back into the base monad.
runPipe
imposes two conditions:
- The pipe's input, if any, is trivially satisfiable (i.e.
()
) - The pipe does not
yield
any output
The latter restriction makes runPipe
less polymorphic than it could be,
and I settled on the restriction for three reasons:
- It prevents against accidental data loss.
- It protects against silent failures
- It prevents wastefully draining a scarce resource by gratuitously driving it to completion
If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:
runPipe $ forever await <+< p