Copyright | (c) 2014, 2015 Gatlin Johnson <gatlin@niltag.net> |
---|---|

License | GPL-3 |

Maintainer | gatlin@niltag.net |

Stability | experimental |

Safe Haskell | Safe |

Language | Haskell2010 |

Write effect-ful stream processing functions and compose them into a series of tubes.

This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.

My goals were to

- learn more about iteratees and stream processing; and
- explore the relationships between functions, pairs, sum types, and products.

- type Tube a b = FreeT (TubeF a b)
- newtype TubeF a b k = TubeF {
- runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r

- type Source b m r = forall x. Tube x b m r
- type Sink a m r = forall x. Tube a x m r
- run :: FreeT f m a -> m (FreeF f a (FreeT f m a))
- await :: Monad m => Tube a b m a
- yield :: Monad m => b -> Tube a b m ()
- each :: (Monad m, Foldable t) => t b -> Tube a b m ()
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- (|>) :: Monad m => Tube x b m r -> Sink (Maybe b) m s -> Sink (Maybe b) m s
- (-<) :: Monad m => a -> Sink a m b -> Sink a m b
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- cat :: Monad m => Tube a a m r
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- reduce :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Source a m () -> m b
- every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m ()
- unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ()))
- prompt :: Source String IO ()
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- display :: Sink String IO ()
- type Pump a b = CofreeT (PumpF a b)
- data PumpF a b k = PumpF {}
- mkPump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r
- pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r
- meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x
- enumerator :: [a] -> Pump (Maybe a) a Identity [a]
- enumerate :: (Monad m, Comonad w) => Pump (Maybe a) b w r -> Tube c a m ()
- lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))

# Documentation

A `Tube`

is a computation that can yield multiple intermediate values or await
intermediate inputs before computing a final result. Any monadic function may
be turned into a `Tube`

.

`Tube`

s may be composed in different ways. For instance, in ghci:

>>> run $ for (each [1..4] >< map show) $ lift . putStrLn 1 2 3 4

Here, `each`

converts a `Foldable`

into a `Source`

of values; `for`

performs a
computation with each value. Another example, using two built-in `Tube`

s for
convenience:

>>> run $ prompt >< filter (/= "Die Antwoord") >< map (++ " is bad") >< print > dubstep dubstep is bad > the sun the sun is bad > Die Antwoord > this example this example is bad

A few stream processing combinators are provided for mapping, filtering, taking, and other basic operations.

For those times when you want to `reduce`

a stream, you can like so:

>>> reduce (+) 0 id (each [1..10]) 55

`><`

is useful for combining `Tube`

s which all have the same return value -
most often `()`

simply because every `Source`

will have that value.

There is more in the library not covered here, and you are encouraged to take a look around.

type Tube a b = FreeT (TubeF a b) Source

A `Tube`

is a computation which can

`yield`

an intermediate value downstream and suspend execution; and`await`

a value from upstream, deferring execution until it is received.

Moreover, individual `Tube`

s may be freely composed into larger ones, so long
as their types match. Thus, one may write small, reusable building blocks and
construct efficient stream process pipelines.

Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.

`TubeF`

is the union of unary functions and binary products into a single
type, here defined with a Boehm-Berarducci encoding.

This type is equivalent to the following:

data TubeF a b k = Await (a -> k) -- :: (a -> k) -> TubeF a b k | Yield (b , k) -- :: (b , k) -> TubeF a b k

The type signatures for the two value constructors should bear a strong
resemblance to the actual type signature of `runT`

. Instead of encoding
tubes as structures which build up when composed, a `TubeF`

is a control
flow mechanism which picks one of two provided continuations.

People using this library should never have to contend with these details but it is worth mentioning.

# Core infrastructure

(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source

Connect a task to a continuation yielding another task; see `><`

(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source

Compose two tubes into a new tube.

# Utilities

map :: Monad m => (a -> b) -> Tube a b m r Source

Transforms all incoming values according to some function.

takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source

Terminates the stream upon receiving a value violating the predicate

filter :: Monad m => (a -> Bool) -> Tube a a m r Source

Yields only values satisfying some predicate.

:: Monad m | |

=> (x -> a -> x) | step function |

-> x | initial value |

-> (x -> b) | final transformation |

-> Source a m () | stream source |

-> m b |

Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.

every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source

Similar to `each`

except it explicitly marks the stream as exhausted

unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source

Taps the next value from a source, maybe.

prompt :: Source String IO () Source

Source of `String`

s from stdin. This is mostly for debugging / ghci example purposes.

mapM :: Monad m => (a -> m b) -> Tube a b m r Source

Similar to `map`

except it maps a monadic function instead of a pure one.

sequence :: Monad m => Tube (m a) a m r Source

Evaluates and extracts a pure value from a monadic one.

display :: Sink String IO () Source

Sink for `String`

s to stdout. This is mostly for debugging / ghci example
purposes.

# Pump

type Pump a b = CofreeT (PumpF a b) Source

A `Pump`

is the dual to a `Tube`

. Intuitively, if a `Tube`

is a stream-
processing computation, then a `Pump`

is both a stream generator and reducer.

Examples may help!

One interesting use of a `Pump`

is as a data stream, which can be fed into a
`Tube`

or `Sink`

.

import Data.Functor.Identity e :: Pump (Maybe Int) Int Identity Int e = mkPump (Identity 0) ((Identity x) -> (Just x, Identity (x+1))) const ex1 :: IO () ex1 = do run $ each e >10show< display -- displays 0-9 in the console

A `Pump`

may also be used to fold a `Source`

. Indeed, a `Pump`

may be thought
of as both a non-recursive left fold and a non-recursive unfold paired
together. (This is called a "metamorphism," hence the function "meta".)

num_src :: Source Int IO () num_src = do forM_ [1..] $ n -> do lift . putStrLn $ "Yielding " ++ (show n) yield n enum_ex :: IO () enum_ex = do v <- reduce (flip send) (meta (+) 0 (x -> (x,x))) extract $ num_src >< take 5 putStrLn . show $ "v = " ++ (show v) -- v = 15

The following is an example of a `Pump`

both accumulating values from a
`Source`

and then enumerating them into a `Sink`

. This gives back both the
result of the computation and the unused input.

```
import Data.Functor.Identity
-- a
````Sink`

that stops after 5 loops, or when input is exhausted
sum_snk :: Sink (Maybe Int) IO Int
sum_snk = do
ns <- forM [1,2,3,4,5] $ _ -> do
mn <- await
case mn of
Just n -> return [n]
Nothing -> return []
return $ sum . concat $ ns
source_sink_ex :: IO ([Int], Int)
source_sink_ex = do
e <- reduce (flip send) (enumerator []) id $ num_src >< take 10
(unused, total) <- pump (,) e sum_snk
putStrLn $ "Total: " ++ (show total)
putStrLn $ "Unused: " ++ (show unused)
-- "Total: 15"
-- "Unused: [6,7,8,9,10]"

Note that when a `Pump`

and a `Tube`

are combined with `pump`

, that the `Tube`

determines control flow. `Pump`

s are comonads, not monads.

There are doubtless more and more interesting examples of combining `Tube`

s
and `Pump`

s. If you think of any, drop the author a line!

send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source

Send a value into a `Pump`

, effectively re-seeding the stream.

pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r Source

A variant of `pump`

which allows effects to be executed inside the pump as well.

meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x Source

Takes a fold function, an initial value, and an unfold to produce a metamorphism. Can be used to change.

enumerator :: [a] -> Pump (Maybe a) a Identity [a] Source

Constructs an enumerator pump, which can buffer values and then enumerate
them to, say, a `Sink`

(see the examples above).

# Re-exports

lift :: MonadTrans t => forall m a. Monad m => m a -> t m a

Lift a computation from the argument monad to the constructed monad.