Safe Haskell | Safe |
---|---|

Language | Haskell2010 |

- data PipeF a b x
- type Pipe a b = FreeT (PipeF a b)
- data C
- type Producer b = Pipe () b
- type Consumer b = Pipe b C
- type Pipeline = Pipe () C
- await :: Monad m => Pipe a b m a
- yield :: Monad m => b -> Pipe a b m ()
- pipe :: Monad m => (a -> b) -> Pipe a b m r
- (<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r
- (>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
- idP :: Monad m => Pipe a a m r
- newtype PipeC m r a b = PipeC {}
- runPipe :: Monad m => Pipeline m r -> m r

# Types

The `Pipe`

type is strongly inspired by Mario Blazevic's `Coroutine`

type in
his concurrency article from Issue 19 of The Monad Reader and is formulated
in the exact same way.

His `Coroutine`

type is actually a free monad transformer (i.e. `FreeT`

)
and his `InOrOut`

functor corresponds to `PipeF`

.

The base functor for the `Pipe`

type

type Pipe a b = FreeT (PipeF a b) Source #

The base type for pipes

`a`

- The type of input received from upstream pipes`b`

- The type of output delivered to downstream pipes`m`

- The base monad`r`

- The type of the return value

# Create Pipes

`yield`

and `await`

are the only two primitives you need to create pipes.
Since `Pipe a b m`

is a monad, you can assemble `yield`

and `await`

statements using ordinary `do`

notation. Since `Pipe a b`

is also a monad
transformer, you can use `lift`

to invoke the base monad. For example, you
could write a pipe stage that requests permission before forwarding any
output:

check :: (Show a) => Pipe a a IO r check = forever $ do x <- await lift $ putStrLn $ "Can '" ++ (show x) ++ "' pass?" ok <- read <$> lift getLine when ok (yield x)

await :: Monad m => Pipe a b m a Source #

Wait for input from upstream.

`await`

blocks until input is available from upstream.

pipe :: Monad m => (a -> b) -> Pipe a b m r Source #

Convert a pure function into a pipe

pipe f = forever $ do x <- await yield (f x)

# Compose Pipes

`Pipe`

s form a `Category`

, meaning that you can compose `Pipe`

s using
(`<+<`

) and also define an identity `Pipe`

: `idP`

. These satisfy the
category laws:

idP <+< p = p p <+< idP = p (p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)

`Pipe`

composition binds the output of the upstream `Pipe`

to the input of
the downstream `Pipe`

. Like Haskell functions, `Pipe`

s are lazy, meaning
that upstream `Pipe`

s are only evaluated as far as necessary to generate
enough input for downstream `Pipe`

s. If any `Pipe`

terminates, it also
terminates every `Pipe`

composed with it.

(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r infixl 9 Source #

Corresponds to (`>>>`

) from `Control.Category`

# Run Pipes

Note that you can also unwrap a `Pipe`

a single step at a time using
`runFreeT`

(since `Pipe`

is just a type synonym for a free monad
transformer). This will take you to the next *external* `await`

or `yield`

statement. This means that a closed `Pipeline`

will unwrap to a single
step, in which case you would have been better served by `runPipe`

.

runPipe :: Monad m => Pipeline m r -> m r Source #

Run the `Pipe`

monad transformer, converting it back into the base monad.

`runPipe`

imposes two conditions:

- The pipe's input, if any, is trivially satisfiable (i.e.
`()`

) - The pipe does not
`yield`

any output

The latter restriction makes `runPipe`

less polymorphic than it could be,
and I settled on the restriction for three reasons:

- It prevents against accidental data loss.
- It protects against silent failures
- It prevents wastefully draining a scarce resource by gratuitously driving it to completion

If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:

runPipe $ forever await <+< p