| Copyright | (c) 2014, 2015 Gatlin Johnson <gatlin@niltag.net> |
|---|---|
| License | GPL-3 |
| Maintainer | gatlin@niltag.net |
| Stability | experimental |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Tubes
Description
Write effect-ful stream processing functions and compose them into a series of tubes.
This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.
If you want to know more about efficient stream processing:
http://okmij.org/ftp/Streams.html
My goals were to
- learn more about iteratees and
- explore the relationships between functions, pairs, sum types, and products.
- type Tube a b = FreeT (TubeF a b)
- newtype TubeF a b k = TubeF {
- runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r
- type Source b m r = forall x. Tube x b m r
- type Sink a m r = forall x. Tube a x m r
- type Action m r = forall x. Tube x x m r
- run :: FreeT f m a -> m (FreeF f a (FreeT f m a))
- await :: Monad m => Tube a b m a
- yield :: Monad m => b -> Tube a b m ()
- each :: (Monad m, Foldable t) => t b -> Tube a b m ()
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- cat :: Monad m => Tube a a m r
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- reduce :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Source a m () -> m b
- every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m ()
- unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ()))
- prompt :: Source String IO ()
- display :: Sink String IO ()
- type Pump a b = CofreeT (PumpF a b)
- data PumpF a b k = PumpF {}
- pump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a
- send :: Comonad w => Pump a b w r -> b -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- runPump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r
- lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Tubes
type Tube a b = FreeT (TubeF a b) Source
A Tube is a computation which can
yieldan intermediate value downstream and suspend execution; andawaita value from upstream, deferring execution until it is received.
Moreover, individual Tubes may be freely composed into larger ones, so long
as their types match. Thus, one may write small, reusable building blocks and
construct efficient stream process pipelines.
Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.
TubeF is the union of unary functions and binary products into a single
type, here defined with a Boehm-Berarducci encoding.
This type is equivalent to the following:
data TubeF a b k
= Await (a -> k) -- :: (a -> k) -> TubeF a b k
| Yield (b , k) -- :: (b , k) -> TubeF a b k
The type signatures for the two value constructors should bear a strong
resemblance to the actual type signature of runT. Instead of encoding
tubes as structures which build up when composed, a TubeF is a control
flow mechanism which picks one of two provided continuations.
People using this library should never have to contend with these details but it is worth mentioning.
Core infrastructure
(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source
Connect a task to a continuation yielding another task; see ><
(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source
Compose two tubes into a new tube.
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
Arguments
| :: Monad m | |
| => (x -> a -> x) | step function |
| -> x | initial value |
| -> (x -> b) | final transformation |
| -> Source a m () | stream source |
| -> m b |
Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.
every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source
Similar to each except it explicitly marks the stream as exhausted
unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source
Taps the next value from a source, maybe.
prompt :: Source String IO () Source
Source of Strings from stdin. This is mostly for debugging / ghci example purposes.
display :: Sink String IO () Source
Sink for Strings to stdout. This is mostly for debugging / ghci example
purposes.
Pump
send :: Comonad w => Pump a b w r -> b -> Pump a b w r Source
Send a value into a Pump, effectively re-seeding the stream.
Re-exports
lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
Lift a computation from the argument monad to the constructed monad.