| Copyright | (c) 2014, 2015 Gatlin Johnson <gatlin@niltag.net> |
|---|---|
| License | GPL-3 |
| Maintainer | gatlin@niltag.net |
| Stability | experimental |
| Safe Haskell | Safe |
| Language | Haskell2010 |
Tubes
Description
Write effect-ful stream processing functions and compose them into a series of tubes.
This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.
My goals were to
- learn more about iteratees and stream processing; and
- explore the relationships between functions, pairs, sum types, and products.
- type Tube a b = FreeT (TubeF a b)
- newtype TubeF a b k = TubeF {
- runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r
- type Source b m r = forall x. Tube x b m r
- type Sink a m r = forall x. Tube a x m r
- run :: FreeT f m a -> m (FreeF f a (FreeT f m a))
- await :: Monad m => Tube a b m a
- yield :: Monad m => b -> Tube a b m ()
- each :: (Monad m, Foldable t) => t b -> Tube a b m ()
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- (|>) :: Monad m => Tube x b m r -> Sink (Maybe b) m s -> Sink (Maybe b) m s
- (-<) :: Monad m => a -> Sink a m b -> Sink a m b
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- cat :: Monad m => Tube a a m r
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- reduce :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Source a m () -> m b
- every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m ()
- unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ()))
- prompt :: MonadIO m => Source String m ()
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- display :: MonadIO m => Sink String m ()
- type Pump a b = CofreeT (PumpF a b)
- data PumpF a b k = PumpF {}
- mkPump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r
- pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r
- meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x
- enumerator :: [a] -> Pump (Maybe a) a Identity [a]
- enumerate :: (Monad m, Comonad w) => Pump (Maybe a) b w r -> Tube c a m ()
- lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Documentation
A Tube is a computation that can yield multiple intermediate values or await
intermediate inputs before computing a final result. Any monadic function may
be turned into a Tube.
Tubes may be composed in different ways. For instance, in ghci:
>>> run $ for (each [1..4] >< map show) $ lift . putStrLn 1 2 3 4
Here, each converts a Foldable into a Source of values; for performs a
computation with each value. Another example, using two built-in Tubes for
convenience:
>>> run $ prompt >< filter (/= "Die Antwoord") >< map (++ " is bad") >< print > dubstep dubstep is bad > the sun the sun is bad > Die Antwoord > this example this example is bad
A few stream processing combinators are provided for mapping, filtering, taking, and other basic operations.
For those times when you want to reduce a stream, you can like so:
>>> reduce (+) 0 id (each [1..10]) 55
>< is useful for combining Tubes which all have the same return value -
most often () simply because every Source will have that value.
There is more in the library not covered here, and you are encouraged to take a look around.
type Tube a b = FreeT (TubeF a b) Source
A Tube is a computation which can
yieldan intermediate value downstream and suspend execution; andawaita value from upstream, deferring execution until it is received.
Moreover, individual Tubes may be freely composed into larger ones, so long
as their types match. Thus, one may write small, reusable building blocks and
construct efficient stream process pipelines.
Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.
TubeF is the union of unary functions and binary products into a single
type, here defined with a Boehm-Berarducci encoding.
This type is equivalent to the following:
data TubeF a b k
= Await (a -> k) -- :: (a -> k) -> TubeF a b k
| Yield (b , k) -- :: (b , k) -> TubeF a b k
The type signatures for the two value constructors should bear a strong
resemblance to the actual type signature of runT. Instead of encoding
tubes as structures which build up when composed, a TubeF is a control
flow mechanism which picks one of two provided continuations.
People using this library should never have to contend with these details but it is worth mentioning.
Core infrastructure
(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source
Connect a task to a continuation yielding another task; see ><
(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source
Compose two tubes into a new tube.
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
Arguments
| :: Monad m | |
| => (x -> a -> x) | step function |
| -> x | initial value |
| -> (x -> b) | final transformation |
| -> Source a m () | stream source |
| -> m b |
Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.
every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source
Similar to each except it explicitly marks the stream as exhausted
unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source
Taps the next value from a source, maybe.
prompt :: MonadIO m => Source String m () Source
Source of Strings from stdin. This is mostly for debugging / ghci example purposes.
mapM :: Monad m => (a -> m b) -> Tube a b m r Source
Similar to map except it maps a monadic function instead of a pure one.
sequence :: Monad m => Tube (m a) a m r Source
Evaluates and extracts a pure value from a monadic one.
display :: MonadIO m => Sink String m () Source
Sink for Strings to stdout. This is mostly for debugging / ghci example
purposes.
Pump
type Pump a b = CofreeT (PumpF a b) Source
A Pump is the dual to a Tube. Intuitively, if a Tube is a stream-
processing computation, then a Pump is both a stream generator and reducer.
Examples may help!
One interesting use of a Pump is as a data stream, which can be fed into a
Tube or Sink.
import Data.Functor.Identity
e :: Pump (Maybe Int) Int Identity Int
e = mkPump (Identity 0)
((Identity x) -> (Just x, Identity (x+1)))
const
ex1 :: IO ()
ex1 = do
run $ each e >< take 10 >< map show >< display
-- displays 0-9 in the console
A Pump may also be used to fold a Source. Indeed, a Pump may be thought
of as both a non-recursive left fold and a non-recursive unfold paired
together. (This is called a "metamorphism," hence the function "meta".)
num_src :: Source Int IO ()
num_src = do
forM_ [1..] $ n -> do
lift . putStrLn $ "Yielding " ++ (show n)
yield n
enum_ex :: IO ()
enum_ex = do
v <- reduce (flip send) (meta (+) 0 (x -> (x,x))) extract $ num_src >< take 5
putStrLn . show $ "v = " ++ (show v)
-- v = 15
The following is an example of a Pump both accumulating values from a
Source and then enumerating them into a Sink. This gives back both the
result of the computation and the unused input.
import Data.Functor.Identity
-- a Sink that stops after 5 loops, or when input is exhausted
sum_snk :: Sink (Maybe Int) IO Int
sum_snk = do
ns <- forM [1,2,3,4,5] $ _ -> do
mn <- await
case mn of
Just n -> return [n]
Nothing -> return []
return $ sum . concat $ ns
source_sink_ex :: IO ([Int], Int)
source_sink_ex = do
e <- reduce (flip send) (enumerator []) id $ num_src >< take 10
(unused, total) <- pump (,) e sum_snk
putStrLn $ "Total: " ++ (show total)
putStrLn $ "Unused: " ++ (show unused)
-- "Total: 15"
-- "Unused: [6,7,8,9,10]"
Note that when a Pump and a Tube are combined with pump, that the Tube
determines control flow. Pumps are comonads, not monads.
There are doubtless more and more interesting examples of combining Tubes
and Pumps. If you think of any, drop the author a line!
send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source
Send a value into a Pump, effectively re-seeding the stream.
pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r Source
A variant of pump which allows effects to be executed inside the pump as well.
meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x Source
Takes a fold function, an initial value, and an unfold to produce a metamorphism. Can be used to change.
enumerator :: [a] -> Pump (Maybe a) a Identity [a] Source
Constructs an enumerator pump, which can buffer values and then enumerate
them to, say, a Sink (see the examples above).
Re-exports
lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
Lift a computation from the argument monad to the constructed monad.