Copyright | (c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net> |
---|---|

License | GPL-3 |

Maintainer | gatlin@niltag.net |

Stability | experimental |

Safe Haskell | Safe |

Language | Haskell2010 |

Write effect-ful stream processing functions and compose them into a series of tubes.

- type Tube a b = FreeT (TubeF a b)
- yield :: Monad m => b -> Tube a b m ()
- await :: Monad m => Tube a b m a
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- runTube :: Monad m => Tube () () m r -> m r
- halt :: Monad m => Tube a b m ()
- newtype Source m a = Source {}
- reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b
- newtype Sink m a = Sink {}
- newtype Channel m a b = Channel {}
- tee :: Monad m => Sink m a -> Channel m a a
- type Pump b a = CofreeT (PumpF b a)
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r
- lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x
- stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r
- streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r
- cat :: Monad m => Tube a a m r
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- each :: (Monad m, Foldable t) => t b -> Tube () b m ()
- every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m ()
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ()))
- pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ()))
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- stop :: Monad m => Tube a () m r
- prompt :: MonadIO m => Source m String
- display :: MonadIO m => Sink m String
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))

# Tube

type Tube a b = FreeT (TubeF a b) Source

The central data type. `Tube`

s stacked on top of the same base monad `m`

may be
composed in series, so long as their type arguments agree.

yield :: Monad m => b -> Tube a b m () Source

Command telling a `Tube`

computation to yield data downstream and pause.

await :: Monad m => Tube a b m a Source

Command telling a `Tube`

computation to pause and await upstream data.

# Sources

An exhaustible source of values parameterized over a base monad. It never
`await`

s, it only `yield`

s.

`Source`

s are monad transformers in their own right, as they are possibly
finite. They may also be synchronously merged as monoids:

import Data.Monoid src1 :: Source IO String src1 = Source $ each ["line A1", "line A2", "line A3"] src2 :: Source IO String src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"] src3 :: Source IO String src3 = src1 <> src2 main :: IO () main = runTube $ sample (src1 <> src2) >< pour display -- line A1 -- line B1 -- line A2 -- line B2 -- line A3 -- line B3 -- line B4

If one source runs out, the other will continue until completion.

MonadTrans Source Source | |

Monad m => Monad (Source m) Source | |

Monad m => Functor (Source m) Source | |

Monad m => Applicative (Source m) Source | |

Monad m => Alternative (Source m) Source | |

Monad m => MonadPlus (Source m) Source | |

MonadIO m => MonadIO (Source m) Source | |

Monad m => Monoid (Source m a) Source | |

Monad m => Semigroup (Source m a) Source |

# Sinks

A potentially full sink of values parameterized over a base monad. It never
`yield`

s.

A `Sink`

is a contravariant functor. Intuitively this means that it is a
consumer of some base type, and you may map transformations over its input
before it is consumed.

Example:

import Data.Functor.Contravariant add5 :: Sink IO Int add5 = Sink $ loop 0 5 where loop acc 0 = do liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc) halt loop acc count = do n <- await loop (acc + n) (count - 1) times2Add5:: Sink IO Int times2Add5 = (*2) >$< add5 main :: IO () main = do runTube $ each [1..10] >< pour add5 -- "Sum of five numbers: 15" runTube $ each [1..10] >< pour times2Add5 -- "Sum of five numbers: 30"

`Sink`

s may also be merged together, as they form a semigroup:

import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Sink IO String writeOut = writeToFile <> writeToConsole main :: IO () main = do runTube $ each [1..3] >< map show >< forever (pour writeOut) -- Totally writing this to a file: 1 -- Console out: 1 -- Totally writing this to a file: 2 -- Console out: 2 -- Totally writing this to a file: 3 -- Console out: 3

# Channels

A `Channel m a b`

is a stream processor which converts values of type `a`

into
values of type `b`

, while also performing side-effects in some monad `m`

.

If a `Channel`

`yield`

s exactly once after each time it `await`

s then it may be
safely treated as an `Arrow`

. For example:

{-# LANGUAGE Arrows #-} import Tubes import Control.Arrow import Prelude hiding (map) -- A simple channel which accumulates a total total :: (Num a, Monad m) => Channel m a a total = Channel $ loop 0 where loop acc = do n <- await let acc' = n + acc yield acc' loop acc' -- A running average using two totals in parallel avg :: (Fractional a, Monad m) => Channel m a a avg = proc value -> do t <- total -< value n <- total -< 1 returnA -< t / n main :: IO () main = runTube $ each [0,10,7,8] >< tune avg >< map show >< pour display

This program would output

0.0 5.0 5.666666666666667 6.25

This has interesting potential in FRP applications.

tee :: Monad m => Sink m a -> Channel m a a Source

Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.

Useful example:

import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Channel IO String String writeOut = tee $ writeToFile <> writeToConsole main :: IO () main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display -- Totally writing this to a file: a -- Console out: a -- a -- Totally writing this to a file: b -- Console out: b -- b -- Totally writing this to a file: c -- Console out: c -- c

This takes advantage of the divisible nature of `Sink`

s to merge effectful
computations and then continue the process.

# Pump

type Pump b a = CofreeT (PumpF b a) Source

Pumps are the dual of `Tube`

s. Where a `Tube`

may either be `await`

ing or
`yield`

ing, a `Pump`

is always in a position to `send`

or `recv`

data. They are
the machines which run `Tube`

s, essentially.

Pumps may be used to formulate infinite streams and folds.

TODO: more examples!

Note the type arguments are "backward" from the `Tube`

point of view: a
`Pump b a w r`

may be sent values of type `a`

and you may receive `b`

values
from it.

pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source

Construct a `Pump`

based on an arbitrary comonad.

lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source

Constructs a resumable left fold. Example usage:

summer :: Pump () Int Identity Int summer = lfold (+) (x -> ((),x)) 0 main :: IO () main = do result <- stream const (duplicate summer) $ each [1..10] putStrLn . show . extract $ result -- "55" result2 <- stream const (duplicate result) $ each [11..20] putStrLn . show . extract $ result2 -- "210"

# Utilities

map :: Monad m => (a -> b) -> Tube a b m r Source

Transforms all incoming values according to some function.

takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source

Terminates the stream upon receiving a value violating the predicate

filter :: Monad m => (a -> Bool) -> Tube a a m r Source

Yields only values satisfying some predicate.

unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source

Taps the next value from a source, maybe.

pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source

Similar to `unyield`

but it first sends a value through the tube.

mapM :: Monad m => (a -> m b) -> Tube a b m r Source

Similar to `map`

except it maps a monadic function instead of a pure one.

sequence :: Monad m => Tube (m a) a m r Source

Evaluates and extracts a pure value from a monadic one.

stop :: Monad m => Tube a () m r Source

A default tube to end a series when no further processing is required.

# Miscellaneous

prompt :: MonadIO m => Source m String Source

Source of `String`

s from stdin. This is mostly for debugging / ghci example purposes.

display :: MonadIO m => Sink m String Source

Sink for `String`

s to stdout. This is mostly for debugging / ghci example
purposes.

# Re-exports

liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a)) Source

runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))

# Example

Code is worth a thousand words. This program ...

import Prelude hiding (map) import qualified Prelude as P import Data.Semigroup import Control.Monad (forevGer) import Tubes srcA :: MonadIO m => Source m String srcA = Source $ each ["line A1", "line A2", "line A3"] srcB :: MonadIO m => Source m String srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"] -- Synchronously merge input srcAB :: MonadIO m => Source m String srcAB = srcA <> srcB writeToFile :: MonadIO m => Sink m String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: MonadIO m => Sink m String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line -- Merge outputs together writeOut :: MonadIO m => Sink m String writeOut = writeToFile <> writeToConsole -- And make outputs re-forward their input data writeOut' :: MonadIO m => Channel m String String writeOut' = tee writeOut main :: IO () main = runTube $ sample srcAB >< tune writeOut' >< pour display

... gives this output:

Totally writing this to a file: line A1 Console out: line A1 line A1 Totally writing this to a file: line B1 Console out: line B1 line B1 Totally writing this to a file: line A2 Console out: line A2 line A2 Totally writing this to a file: line B2 Console out: line B2 line B2 Totally writing this to a file: line A3 Console out: line A3 line A3 Totally writing this to a file: line B3 Console out: line B3 line B3 Totally writing this to a file: line B4 Console out: line B4 line B4