Copyright | (c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net> |
---|---|

License | GPL-3 |

Maintainer | gatlin@niltag.net |

Stability | experimental |

Safe Haskell | Trustworthy |

Language | Haskell2010 |

Write effect-ful stream processing functions and compose them into a series of tubes.

- type Tube a b = FreeT (TubeF a b)
- yield :: Monad m => b -> Tube a b m ()
- await :: Monad m => Tube a b m a
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- runTube :: Monad m => Tube () () m r -> m r
- halt :: Monad m => Tube a b m ()
- newtype Source m a = Source {}
- reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b
- merge :: Monad m => Source m a -> Source m a -> Source m a
- newtype Sink m a = Sink {}
- newtype Channel m a b = Channel {}
- tee :: Monad m => Sink m a -> Channel m a a
- type Pump b a = CofreeT (PumpF b a)
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r
- lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x
- stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r
- streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r
- cat :: Monad m => Tube a a m r
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- each :: (Monad m, Foldable t) => t b -> Tube () b m ()
- every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m ()
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ()))
- pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ()))
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- stop :: Monad m => Tube a () m r
- prompt :: MonadIO m => Source m String
- display :: MonadIO m => Sink m String
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))

# Tube

A `Tube`

is a computation which may `await`

values from an upstream source,
`yield`

values to a downstream receiver, or both. `Tube`

s may be composed in
series to build complex stream processors using the '(><)' operator.

`Tube`

s are also monad transformers so you can add stream processing
capabilities to any base monad.

There are three varieties of `Tube`

s which have different properties:
`Source`

s, `Sink`

s, and `Channel`

s. They each restrict the `Tube`

type in
different ways to guarantee correctness while still allowing them to be
composed. More information is provided with their respective definitions.

The dual to `Tube`

is `Pump`

. It is a comonad which endows another base comonad
with the ability to `send`

and `recv`

values.

Several useful `Tube`

functions - like `runTube`

, `reduce`

, and `stream`

- are
implemented in terms of `Pump`

. Beyond simply evaluating `Tube`

s they have
other uses. The `lfold`

function, for instance, constructs a resumable left fold
structure.

This library is inspired in large part by `pipes`

, `conduit`

, and others. While
it intends to be efficient and useful in its own right it began as an exercise
in implementing the basics of those other libraries with comonads and dualities
in mind.

type Tube a b = FreeT (TubeF a b) Source

The central data type. `Tube`

s stacked on top of the same base monad `m`

may be
composed in series, so long as their type arguments agree.

yield :: Monad m => b -> Tube a b m () Source

Command telling a `Tube`

computation to yield data downstream and pause.

await :: Monad m => Tube a b m a Source

Command telling a `Tube`

computation to pause and await upstream data.

# Sources

An exhaustible source of values parameterized over a base monad. It never
`await`

s, it only `yield`

s.

`Source`

s are monad transformers in their own right, as they are possibly
finite. They may also be synchronously merged:

```
src1 :: Source IO String
src1 = Source $ each ["line A1", "line A2", "line A3"]
src2 :: Source IO String
src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"]
src3 :: Source IO String
src3 = src1
````merge`

src2
main :: IO ()
main = runTube $ sample src3 >< pour display
-- line A1
-- line B1
-- line A2
-- line B2
-- line A3
-- line B3
-- line B4

If one source runs out, the other will continue until completion.

Digression: originally `merge`

was the implementation for `mappend`

and '(<>)'.
However because `Source`

is ultimately a list transformer I thought it better
that these instances preserve the behavior found in lists and instead provide a
separate function for synchronous merging.

MonadTrans Source Source | |

Monad m => Monad (Source m) Source | |

Monad m => Functor (Source m) Source | |

Monad m => Applicative (Source m) Source | |

Monad m => Alternative (Source m) Source | |

Monad m => MonadPlus (Source m) Source | |

MonadIO m => MonadIO (Source m) Source | |

(Monad m, Floating a) => Floating (Source m a) Source | |

(Monad m, Fractional a) => Fractional (Source m a) Source | |

(Monad m, Num a) => Num (Source m a) Source | |

Monad m => Monoid (Source m a) Source | |

Monad m => Semigroup (Source m a) Source |

merge :: Monad m => Source m a -> Source m a -> Source m a Source

Interleave the values of two `Source`

s until both are exhausted.

# Sinks

A potentially full sink of values parameterized over a base monad. It never
`yield`

s.

A `Sink`

is a contravariant functor. Intuitively this means that it is a
consumer of some base type, and you may map transformations over its input
before it is consumed.

Example:

import Data.Functor.Contravariant add5 :: Sink IO Int add5 = Sink $ loop 0 5 where loop acc 0 = do liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc) halt loop acc count = do n <- await loop (acc + n) (count - 1) times2Add5:: Sink IO Int times2Add5 = (*2) >$< add5 main :: IO () main = do runTube $ each [1..10] >< pour add5 -- "Sum of five numbers: 15" runTube $ each [1..10] >< pour times2Add5 -- "Sum of five numbers: 30"

`Sink`

s may also be merged together, as they form a semigroup:

import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Sink IO String writeOut = writeToFile <> writeToConsole main :: IO () main = do runTube $ each [1..3] >< map show >< forever (pour writeOut) -- Totally writing this to a file: 1 -- Console out: 1 -- Totally writing this to a file: 2 -- Console out: 2 -- Totally writing this to a file: 3 -- Console out: 3

# Channels

A `Channel m a b`

is a stream processor which converts values of type `a`

into
values of type `b`

, while also performing side-effects in some monad `m`

.

If a `Channel`

`yield`

s exactly once after each time it `await`

s then it may be
safely treated as an `Arrow`

. For example:

{-# LANGUAGE Arrows #-} import Tubes import Control.Arrow import Prelude hiding (map) -- A simple channel which accumulates a total total :: (Num a, Monad m) => Channel m a a total = Channel $ loop 0 where loop acc = do n <- await let acc' = n + acc yield acc' loop acc' -- A running average using two totals in parallel avg :: (Fractional a, Monad m) => Channel m a a avg = proc value -> do t <- total -< value n <- total -< 1 returnA -< t / n main :: IO () main = runTube $ each [0,10,7,8] >< tune avg >< map show >< pour display

This program would output

0.0 5.0 5.666666666666667 6.25

This has interesting potential in FRP applications.

tee :: Monad m => Sink m a -> Channel m a a Source

Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.

Useful example:

import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Channel IO String String writeOut = tee $ writeToFile <> writeToConsole main :: IO () main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display -- Totally writing this to a file: a -- Console out: a -- a -- Totally writing this to a file: b -- Console out: b -- b -- Totally writing this to a file: c -- Console out: c -- c

This takes advantage of the divisible nature of `Sink`

s to merge effectful
computations and then continue the process.

# Pump

type Pump b a = CofreeT (PumpF b a) Source

Pumps are the dual of `Tube`

s. Where a `Tube`

may either be `await`

ing or
`yield`

ing, a `Pump`

is always in a position to `send`

or `recv`

data. They are
the machines which run `Tube`

s, essentially.

Pumps may be used to formulate infinite streams and folds.

TODO: more examples!

Note the type arguments are "backward" from the `Tube`

point of view: a
`Pump b a w r`

may be sent values of type `a`

and you may receive `b`

values
from it.

pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source

Construct a `Pump`

based on an arbitrary comonad.

lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source

Constructs a resumable left fold. Example usage:

summer :: Pump () Int Identity Int summer = lfold (+) (x -> ((),x)) 0 main :: IO () main = do result <- stream const (duplicate summer) $ each [1..10] putStrLn . show . extract $ result -- "55" result2 <- stream const (duplicate result) $ each [11..20] putStrLn . show . extract $ result2 -- "210"

# Utilities

map :: Monad m => (a -> b) -> Tube a b m r Source

Transforms all incoming values according to some function.

takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source

Terminates the stream upon receiving a value violating the predicate

filter :: Monad m => (a -> Bool) -> Tube a a m r Source

Yields only values satisfying some predicate.

unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source

Taps the next value from a source, maybe.

pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source

Similar to `unyield`

but it first sends a value through the tube.

mapM :: Monad m => (a -> m b) -> Tube a b m r Source

Similar to `map`

except it maps a monadic function instead of a pure one.

sequence :: Monad m => Tube (m a) a m r Source

Evaluates and extracts a pure value from a monadic one.

stop :: Monad m => Tube a () m r Source

A default tube to end a series when no further processing is required.

# Miscellaneous

prompt :: MonadIO m => Source m String Source

Source of `String`

s from stdin. This is mostly for debugging / ghci example purposes.

display :: MonadIO m => Sink m String Source

Sink for `String`

s to stdout. This is mostly for debugging / ghci example
purposes.

# Re-exports

runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))

# Example

Code is worth a thousand words. This program ...

```
import Prelude hiding (map)
import qualified Prelude as P
import Data.Semigroup
import Control.Monad (forever)
import Tubes
srcA :: MonadIO m => Source m String
srcA = Source $ each ["line A1", "line A2", "line A3"]
srcB :: MonadIO m => Source m String
srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"]
-- Synchronously merge input
srcAB :: MonadIO m => Source m String
srcAB = srcA
````merge`

srcB
writeToFile :: MonadIO m => Sink m String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: MonadIO m => Sink m String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
-- Merge outputs together
writeOut :: MonadIO m => Sink m String
writeOut = writeToFile <> writeToConsole
-- And make outputs re-forward their input data
writeOut' :: MonadIO m => Channel m String String
writeOut' = tee writeOut
main :: IO ()
main = runTube $ sample srcAB
>< tune writeOut'
>< pour display

... gives this output:

Totally writing this to a file: line A1 Console out: line A1 line A1 Totally writing this to a file: line B1 Console out: line B1 line B1 Totally writing this to a file: line A2 Console out: line A2 line A2 Totally writing this to a file: line B2 Console out: line B2 line B2 Totally writing this to a file: line A3 Console out: line A3 line A3 Totally writing this to a file: line B3 Console out: line B3 line B3 Totally writing this to a file: line B4 Console out: line B4 line B4