| Safe Haskell | Safe |
|---|---|
| Language | Haskell98 |
Control.Pipe.Core
Description
- data Pipe a b m r
- data C
- type Producer b = Pipe () b
- type Consumer b = Pipe b C
- type Pipeline = Pipe () C
- await :: Pipe a b m a
- yield :: b -> Pipe a b m ()
- pipe :: Monad m => (a -> b) -> Pipe a b m r
- (<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r
- (>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
- idP :: Monad m => Pipe a a m r
- newtype PipeC m r a b = PipeC {}
- runPipe :: Monad m => Pipeline m r -> m r
Types
The Pipe type is strongly inspired by Mario Blazevic's Coroutine type in
his concurrency article from Issue 19 of The Monad Reader.
The base type for pipes
a- The type of input received from upstream pipesb- The type of output delivered to downstream pipesm- The base monadr- The type of the return value
Create Pipes
yield and await are the only two primitives you need to create pipes.
Since Pipe a b m is a monad, you can assemble yield and await
statements using ordinary do notation. Since Pipe a b is also a monad
transformer, you can use lift to invoke the base monad. For example, you
could write a pipe stage that requests permission before forwarding any
output:
check :: (Show a) => Pipe a a IO r
check = forever $ do
x <- await
lift $ putStrLn $ "Can '" ++ (show x) ++ "' pass?"
ok <- read <$> lift getLine
when ok (yield x)await :: Pipe a b m a Source #
Wait for input from upstream.
await blocks until input is available from upstream.
pipe :: Monad m => (a -> b) -> Pipe a b m r Source #
Convert a pure function into a pipe
pipe f = forever $ do
x <- await
yield (f x)Compose Pipes
Pipes form a Category, meaning that you can compose Pipes using
(<+<) and also define an identity Pipe: idP. These satisfy the
category laws:
idP <+< p = p p <+< idP = p (p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)
Pipe composition binds the output of the upstream Pipe to the input of
the downstream Pipe. Like Haskell functions, Pipes are lazy, meaning
that upstream Pipes are only evaluated as far as necessary to generate
enough input for downstream Pipes. If any Pipe terminates, it also
terminates every Pipe composed with it.
(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r infixl 9 Source #
Corresponds to (>>>) from Control.Category
Run Pipes
Note that you can also unwrap a Pipe a single step at a time using
runFreeT (since Pipe is just a type synonym for a free monad
transformer). This will take you to the next external await or yield
statement. This means that a closed Pipeline will unwrap to a single
step, in which case you would have been better served by runPipe.
runPipe :: Monad m => Pipeline m r -> m r Source #
Run the Pipe monad transformer, converting it back into the base monad.
runPipe imposes two conditions:
- The pipe's input, if any, is trivially satisfiable (i.e.
()) - The pipe does not
yieldany output
The latter restriction makes runPipe less polymorphic than it could be,
and I settled on the restriction for three reasons:
- It prevents against accidental data loss.
- It protects against silent failures
- It prevents wastefully draining a scarce resource by gratuitously driving it to completion
If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:
runPipe $ forever await <+< p