| Copyright | (c) 2014, 2015 Gatlin Johnson <gatlin@niltag.net> |
|---|---|
| License | GPL-3 |
| Maintainer | gatlin@niltag.net |
| Stability | experimental |
| Safe Haskell | Safe |
| Language | Haskell2010 |
Tubes
Description
Write effect-ful stream processing functions and compose them into a series of tubes.
This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.
My goals were to
- learn more about iteratees and stream processing; and
- explore the relationships between functions, pairs, sum types, and products.
- type Tube a b = FreeT (TubeF a b)
- newtype TubeF a b k = TubeF {
- runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r
- type Source b m r = forall x. Tube x b m r
- type Sink a m r = forall x. Tube a x m r
- run :: FreeT f m a -> m (FreeF f a (FreeT f m a))
- await :: Monad m => Tube a b m a
- yield :: Monad m => b -> Tube a b m ()
- each :: (Monad m, Foldable t) => t b -> Tube a b m ()
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- (|>) :: Monad m => Source b m () -> Sink (Maybe b) m s -> Sink (Maybe b) m s
- (-<) :: Monad m => a -> Sink a m b -> Sink a m b
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- cat :: Monad m => Tube a a m r
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- reduce :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Source a m () -> m b
- every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m ()
- unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ()))
- prompt :: Source String IO ()
- display :: Sink String IO ()
- type Pump a b = CofreeT (PumpF a b)
- data PumpF a b k = PumpF {}
- pump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a
- send :: Comonad w => Pump a b w r -> b -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- runPump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r
- lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Documentation
A Tube is a computation that can yield multiple intermediate values or await
intermediate inputs before computing a final result. Any monadic function may
be turned into a Tube.
Tubes may be composed in different ways. For instance, in ghci:
>>> run $ for (each [1..4] >< map show) $ lift . putStrLn 1 2 3 4
Here, each converts an Foldable into a Source of values; for performs
a computation with each value. Another example, using two built-in Tubes for
convenience:
>>> run $ prompt >< filter (/= "Die Antwoord") >< map (++ " is bad") >< print > dubstep dubstep is bad > the sun the sun is bad > Die Antwoord > this example this example is bad
A few stream processing combinators are provided for mapping, filtering, taking, and other basic operations.
For those times when you want to reduce a stream, you can like so:
>>> reduce (+) 0 id (each [1..10]) 55
>< is useful for combining Tubes which all have the same return value -
most often () simply because every Source will have that value.
There is more in the library not covered here, and you are encouraged to take a look around.
type Tube a b = FreeT (TubeF a b) Source
A Tube is a computation which can
yieldan intermediate value downstream and suspend execution; andawaita value from upstream, deferring execution until it is received.
Moreover, individual Tubes may be freely composed into larger ones, so long
as their types match. Thus, one may write small, reusable building blocks and
construct efficient stream process pipelines.
Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.
TubeF is the union of unary functions and binary products into a single
type, here defined with a Boehm-Berarducci encoding.
This type is equivalent to the following:
data TubeF a b k
= Await (a -> k) -- :: (a -> k) -> TubeF a b k
| Yield (b , k) -- :: (b , k) -> TubeF a b k
The type signatures for the two value constructors should bear a strong
resemblance to the actual type signature of runT. Instead of encoding
tubes as structures which build up when composed, a TubeF is a control
flow mechanism which picks one of two provided continuations.
People using this library should never have to contend with these details but it is worth mentioning.
Core infrastructure
(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source
Connect a task to a continuation yielding another task; see ><
(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source
Compose two tubes into a new tube.
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
Arguments
| :: Monad m | |
| => (x -> a -> x) | step function |
| -> x | initial value |
| -> (x -> b) | final transformation |
| -> Source a m () | stream source |
| -> m b |
Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.
every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source
Similar to each except it explicitly marks the stream as exhausted
unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source
Taps the next value from a source, maybe.
prompt :: Source String IO () Source
Source of Strings from stdin. This is mostly for debugging / ghci example purposes.
display :: Sink String IO () Source
Sink for Strings to stdout. This is mostly for debugging / ghci example
purposes.
Pump
type Pump a b = CofreeT (PumpF a b) Source
A Pump is the dual to a Tube: where a Tube is a computation manipulating
a stream of values, a Pump can be situated on either end of a tube to both
insert values when requested and handle any yielded results.
One interesting use of a Pump is to feed data to a Tube, collecting the
result as well as unused input:
import Data.Functor.Identity
p :: [a] -> Pump (Maybe a) x Identity [a]
p inp = pump (return inp)
(wa -> case (extract wa) of
[] -> (Nothing, wa)
x:xs -> (Just x, return xs))
const
-- a Sink that stops after 5 loops, or when input is exhausted
add5 :: Sink (Maybe Int) IO Int
add5 = loop 0 5 where
loop acc ct = if 0 == ct
then return acc
else do
mn <- await
maybe (return acc)
(n -> loop (acc+n) (ct - 1))
mn
result :: IO ([Int], Int)
result = runPump (curry id) (p [1..10]) add5
-- ([6,7,8,9,10],15)
Pumps are still being investigated by the author so if you come up with
something interesting, please share!
send :: Comonad w => Pump a b w r -> b -> Pump a b w r Source
Send a value into a Pump, effectively re-seeding the stream.
Re-exports
lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
Lift a computation from the argument monad to the constructed monad.