pipes-2.4.0: Compositional pipelines

Control.Pipe.Core

Description

The Pipe type is a monad transformer that enriches the base monad with the ability to await or yield data to and from other Pipes.

Synopsis

# Types

The Pipe type is strongly inspired by Mario Blazevic's Coroutine type in his concurrency article from Issue 19 of The Monad Reader and is formulated in the exact same way.

His Coroutine type is actually a free monad transformer (i.e. FreeT) and his InOrOut functor corresponds to PipeF.

data PipeF a b x Source #

The base functor for the Pipe type

Constructors

 Await (a -> x) Yield b x

Instances

 Functor (PipeF a b) Source # Methodsfmap :: (a -> b) -> PipeF a b a -> PipeF a b b #(<$) :: a -> PipeF a b b -> PipeF a b a # type Pipe a b = FreeT (PipeF a b) Source # The base type for pipes • a - The type of input received from upstream pipes • b - The type of output delivered to downstream pipes • m - The base monad • r - The type of the return value data C Source # The empty type, denoting a 'C'losed end type Producer b = Pipe () b Source # A pipe that produces values type Consumer b = Pipe b C Source # A pipe that consumes values type Pipeline = Pipe () C Source # A self-contained pipeline that is ready to be run # Create Pipes yield and await are the only two primitives you need to create pipes. Since Pipe a b m is a monad, you can assemble yield and await statements using ordinary do notation. Since Pipe a b is also a monad transformer, you can use lift to invoke the base monad. For example, you could write a pipe stage that requests permission before forwarding any output: check :: (Show a) => Pipe a a IO r check = forever$ do
x <- await
lift $putStrLn$ "Can '" ++ (show x) ++ "' pass?"
ok <- read <$> lift getLine when ok (yield x) await :: Monad m => Pipe a b m a Source # Wait for input from upstream. await blocks until input is available from upstream. yield :: Monad m => b -> Pipe a b m () Source # Deliver output downstream. yield restores control back upstream and binds the result to await. pipe :: Monad m => (a -> b) -> Pipe a b m r Source # Convert a pure function into a pipe pipe f = forever$ do
x <- await
yield (f x)

# Compose Pipes

Pipes form a Category, meaning that you can compose Pipes using (<+<) and also define an identity Pipe: idP. These satisfy the category laws:

idP <+< p = p

p <+< idP = p

(p1 <+< p2) <+< p3 = p1 <+< (p2 <+< p3)

Pipe composition binds the output of the upstream Pipe to the input of the downstream Pipe. Like Haskell functions, Pipes are lazy, meaning that upstream Pipes are only evaluated as far as necessary to generate enough input for downstream Pipes. If any Pipe terminates, it also terminates every Pipe composed with it.

(<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r infixr 9 Source #

Corresponds to (<<<)/(.) from Control.Category

(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r infixl 9 Source #

Corresponds to (>>>) from Control.Category

idP :: Monad m => Pipe a a m r Source #

Corresponds to id from Control.Category

newtype PipeC m r a b Source #

Pipes form a Category instance when you rearrange the type variables

Constructors

 PipeC FieldsunPipeC :: Pipe a b m r

Instances

 Monad m => Category * (PipeC m r) Source # Methodsid :: cat a a #(.) :: cat b c -> cat a b -> cat a c #

# Run Pipes

Note that you can also unwrap a Pipe a single step at a time using runFreeT (since Pipe is just a type synonym for a free monad transformer). This will take you to the next external await or yield statement. This means that a closed Pipeline will unwrap to a single step, in which case you would have been better served by runPipe.

runPipe :: Monad m => Pipeline m r -> m r Source #

Run the Pipe monad transformer, converting it back into the base monad.

runPipe imposes two conditions:

• The pipe's input, if any, is trivially satisfiable (i.e. ())
• The pipe does not yield any output

The latter restriction makes runPipe less polymorphic than it could be, and I settled on the restriction for three reasons:

• It prevents against accidental data loss.
• It protects against silent failures
• It prevents wastefully draining a scarce resource by gratuitously driving it to completion

If you believe that discarding output is the appropriate behavior, you can specify this by explicitly feeding your output to a pipe that gratuitously discards it:

runPipe \$ forever await <+< p