tubes- Effectful, iteratee-inspired stream processing based on a free monad.

Copyright(c) 2014, 2015 Gatlin Johnson <>
Safe HaskellSafe




Write effect-ful stream processing functions and compose them into a series of tubes.

This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.

My goals were to

  • learn more about iteratees and stream processing; and
  • explore the relationships between functions, pairs, sum types, and products.



A Tube is a computation that can yield multiple intermediate values or await intermediate inputs before computing a final result. Any monadic function may be turned into a Tube.

Tubes may be composed in different ways. For instance, in ghci:

   >>> run $ for (each [1..4] >< map show) $ lift . putStrLn

Here, each converts a Foldable into a Source of values; for performs a computation with each value. Another example, using two built-in Tubes for convenience:

   >>> run $ prompt >< filter (/= "Die Antwoord") >< map (++ " is bad") >< print
   > dubstep
   dubstep is bad
   > the sun
   the sun is bad
   > Die Antwoord
   > this example
   this example is bad

A few stream processing combinators are provided for mapping, filtering, taking, and other basic operations.

For those times when you want to reduce a stream, you can like so:

   >>> reduce (+) 0 id (each [1..10])

>< is useful for combining Tubes which all have the same return value - most often () simply because every Source will have that value.

There is more in the library not covered here, and you are encouraged to take a look around.

type Tube a b = FreeT (TubeF a b) Source

A Tube is a computation which can

  • yield an intermediate value downstream and suspend execution; and
  • await a value from upstream, deferring execution until it is received.

Moreover, individual Tubes may be freely composed into larger ones, so long as their types match. Thus, one may write small, reusable building blocks and construct efficient stream process pipelines.

Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.

newtype TubeF a b k Source

TubeF is the union of unary functions and binary products into a single type, here defined with a Boehm-Berarducci encoding.

This type is equivalent to the following:

       data TubeF a b k
           = Await (a -> k) -- :: (a -> k) -> TubeF a b k
           | Yield (b  , k) -- :: (b  , k) -> TubeF a b k

The type signatures for the two value constructors should bear a strong resemblance to the actual type signature of runT. Instead of encoding tubes as structures which build up when composed, a TubeF is a control flow mechanism which picks one of two provided continuations.

People using this library should never have to contend with these details but it is worth mentioning.




runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r


type Source b m r = forall x. Tube x b m r Source

A computation which only yields and never awaits

type Sink a m r = forall x. Tube a x m r Source

A computation which only awaits and never yields.

Core infrastructure

run :: FreeT f m a -> m (FreeF f a (FreeT f m a)) Source

run is shorter than runFreeT and who knows, maybe it'll change some day

await :: Monad m => Tube a b m a Source

Command to wait for a new value upstream

yield :: Monad m => b -> Tube a b m () Source

Command to send a value downstream

each :: (Monad m, Foldable t) => t b -> Tube a b m () Source

Convert a list to a Source

for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r Source

Enumerate yielded values into a continuation, creating a new Source

(~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r Source

Infix version of for

(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source

Connect a task to a continuation yielding another task; see ><

(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source

Compose two tubes into a new tube.

(|>) :: Monad m => Tube x b m r -> Sink (Maybe b) m s -> Sink (Maybe b) m s Source

Connects a Source to a Sink, finishing when either the Source is exhausted or the Sink terminates.

(-<) :: Monad m => a -> Sink a m b -> Sink a m b Source

Insert a value into a Sink

liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a)) Source

This performs a neat trick: a Tube with a return type a will be turned into a new Tube containing the underlying TubeF value.

In this way the >< and >- functions can replace the () return value with a continuation and recursively traverse the computation until a final result is reached.


cat :: Monad m => Tube a a m r Source

Continuously relays any values it receives. Iteratee identity.

map :: Monad m => (a -> b) -> Tube a b m r Source

Transforms all incoming values according to some function.

drop :: Monad m => Int -> Tube a a m r Source

Refuses to yield the first n values it receives.

take :: Monad m => Int -> Tube a a m () Source

Relay only the first n elements of a stream.

takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source

Terminates the stream upon receiving a value violating the predicate

filter :: Monad m => (a -> Bool) -> Tube a a m r Source

Yields only values satisfying some predicate.

reduce Source


:: Monad m 
=> (x -> a -> x)

step function

-> x

initial value

-> (x -> b)

final transformation

-> Source a m ()

stream source

-> m b 

Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.

every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source

Similar to each except it explicitly marks the stream as exhausted

unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source

Taps the next value from a source, maybe.

prompt :: MonadIO m => Source String m () Source

Source of Strings from stdin. This is mostly for debugging / ghci example purposes.

mapM :: Monad m => (a -> m b) -> Tube a b m r Source

Similar to map except it maps a monadic function instead of a pure one.

sequence :: Monad m => Tube (m a) a m r Source

Evaluates and extracts a pure value from a monadic one.

display :: MonadIO m => Sink String m () Source

Sink for Strings to stdout. This is mostly for debugging / ghci example purposes.


type Pump a b = CofreeT (PumpF a b) Source

A Pump is the dual to a Tube. Intuitively, if a Tube is a stream- processing computation, then a Pump is both a stream generator and reducer.

Examples may help!

One interesting use of a Pump is as a data stream, which can be fed into a Tube or Sink.

   import Data.Functor.Identity

   e :: Pump (Maybe Int) Int Identity Int
   e = mkPump (Identity 0)
           ((Identity x) -> (Just x, Identity (x+1)))

   ex1 :: IO ()
   ex1 = do
       run $ each e >< take 10 >< map show >< display
       -- displays 0-9 in the console

A Pump may also be used to fold a Source. Indeed, a Pump may be thought of as both a non-recursive left fold and a non-recursive unfold paired together. (This is called a "metamorphism," hence the function "meta".)

   num_src :: Source Int IO ()
   num_src = do
       forM_ [1..] $ n -> do
           lift . putStrLn $ "Yielding " ++ (show n)
           yield n

   enum_ex :: IO ()
   enum_ex = do
       v <- reduce (flip send) (meta (+) 0 (x -> (x,x))) extract $ num_src >< take 5
       putStrLn . show $ "v = " ++ (show v)
       -- v = 15

The following is an example of a Pump both accumulating values from a Source and then enumerating them into a Sink. This gives back both the result of the computation and the unused input.

   import Data.Functor.Identity

   -- a Sink that stops after 5 loops, or when input is exhausted
   sum_snk :: Sink (Maybe Int) IO Int
   sum_snk = do
       ns <- forM [1,2,3,4,5] $ _ -> do
           mn <- await
           case mn of
               Just n -> return [n]
               Nothing -> return []
       return $ sum . concat $ ns

   source_sink_ex :: IO ([Int], Int)
   source_sink_ex = do
       e <- reduce (flip send) (enumerator []) id $ num_src >< take 10
       (unused, total) <- pump (,) e sum_snk
       putStrLn $ "Total: " ++ (show total)
       putStrLn $ "Unused: " ++ (show unused)
       -- "Total: 15"
       -- "Unused: [6,7,8,9,10]"

Note that when a Pump and a Tube are combined with pump, that the Tube determines control flow. Pumps are comonads, not monads.

There are doubtless more and more interesting examples of combining Tubes and Pumps. If you think of any, drop the author a line!

data PumpF a b k Source




recvF :: (a, k)
sendF :: b -> k


mkPump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a Source

Creates a Pump for a Tube using a comonadic seed value, a function to give it more data upon request, and a function to handle any yielded results.

Values received from the Tube may be altered and sent back into the tube, hence this mechanism does act like something of a pump.

send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source

Send a value into a Pump, effectively re-seeding the stream.

recv :: Comonad w => Pump a b w r -> (a, Pump a b w r) Source

Pull a value from a Pump, along with the rest of the Pump.

pump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r Source

Given a suitably matching Tube and Pump, you can use the latter to execute the former.

pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r Source

A variant of pump which allows effects to be executed inside the pump as well.

meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x Source

Takes a fold function, an initial value, and an unfold to produce a metamorphism. Can be used to change.

enumerator :: [a] -> Pump (Maybe a) a Identity [a] Source

Constructs an enumerator pump, which can buffer values and then enumerate them to, say, a Sink (see the examples above).

enumerate :: (Monad m, Comonad w) => Pump (Maybe a) b w r -> Tube c a m () Source

Transforms a Pump into a corresponding Tube.


lift :: MonadTrans t => forall m a. Monad m => m a -> t m a

Lift a computation from the argument monad to the constructed monad.

runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))