tubes- Write stream processing computations with side effects in a series of tubes.

Safe HaskellTrustworthy




Write effect-ful stream processing functions and compose them into a series of tubes.



A Tube is a computation which may await values from an upstream source, yield values to a downstream receiver, or both. Tubes may be composed in series to build complex stream processors using the '(><)' operator.

Tubes are also monad transformers so you can add stream processing capabilities to any base monad.

There are three varieties of Tubes which have different properties: Sources, Sinks, and Channels. They each restrict the Tube type in different ways to guarantee correctness while still allowing them to be composed. More information is provided with their respective definitions.

The dual to Tube is Pump. It is a comonad which endows another base comonad with the ability to send and recv values.

Several useful Tube functions - like runTube, reduce, and stream - are implemented in terms of Pump. Beyond simply evaluating Tubes they have other uses. The lfold function, for instance, constructs a resumable left fold structure.

This library is inspired in large part by pipes, conduit, and others. While it intends to be efficient and useful in its own right it began as an exercise in implementing the basics of those other libraries with comonads and dualities in mind.

type Tube a b = FreeT (TubeF a b) Source

The central data type. Tubes stacked on top of the same base monad m may be composed in series, so long as their type arguments agree.

yield :: Monad m => b -> Tube a b m () Source

Command telling a Tube computation to yield data downstream and pause.

await :: Monad m => Tube a b m a Source

Command telling a Tube computation to pause and await upstream data.

(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source

Compose compatible tubes in series to produce a new Tube.

    each [1..10] >(*2)< pour display

runTube :: Monad m => Tube () () m r -> m r Source

Run a self-contained Tube computation.

halt :: Monad m => Tube a b m () Source

Command telling a Tube with base type () to simply stop.


newtype Source m a Source

An exhaustible source of values parameterized over a base monad. It never awaits, it only yields.

Sources are monad transformers in their own right, as they are possibly finite. They may also be synchronously merged:

    src1 :: Source IO String
    src1 = Source $ each ["line A1", "line A2", "line A3"]

    src2 :: Source IO String
    src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"]

    src3 :: Source IO String
    src3 = src1 merge src2

    main :: IO ()
    main = runTube $ sample src3 >< pour display
    -- line A1
    -- line B1
    -- line A2
    -- line B2
    -- line A3
    -- line B3
    -- line B4

If one source runs out, the other will continue until completion.

Digression: originally merge was the implementation for mappend and '(<>)'. However because Source is ultimately a list transformer I thought it better that these instances preserve the behavior found in lists and instead provide a separate function for synchronous merging.




sample :: Tube () a m ()


MonadTrans Source Source 
Monad m => Monad (Source m) Source 
Monad m => Functor (Source m) Source 
Monad m => Applicative (Source m) Source 
Monad m => Alternative (Source m) Source 
Monad m => MonadPlus (Source m) Source 
MonadIO m => MonadIO (Source m) Source 
(Monad m, Floating a) => Floating (Source m a) Source 
(Monad m, Fractional a) => Fractional (Source m a) Source 
(Monad m, Num a) => Num (Source m a) Source 
Monad m => Monoid (Source m a) Source 
Monad m => Semigroup (Source m a) Source 

reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b Source

Strict left-fold of a Source, using a Pump internally.

merge :: Monad m => Source m a -> Source m a -> Source m a Source

Interleave the values of two Sources until both are exhausted.


newtype Sink m a Source

A potentially full sink of values parameterized over a base monad. It never yields.

A Sink is a contravariant functor. Intuitively this means that it is a consumer of some base type, and you may map transformations over its input before it is consumed.


    import Data.Functor.Contravariant

    add5 :: Sink IO Int
    add5 = Sink $ loop 0 5 where
        loop acc 0 = do
            liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc)
        loop acc count = do
            n <- await
            loop (acc + n) (count - 1)

    times2Add5:: Sink IO Int
    times2Add5 = (*2) >$< add5

    main :: IO ()
    main = do
        runTube $ each [1..10] >< pour add5
        -- "Sum of five numbers: 15"

        runTube $ each [1..10] >< pour times2Add5
        -- "Sum of five numbers: 30"

Sinks may also be merged together, as they form a semigroup:

    import Data.Semigroup

    writeToFile :: Sink IO String
    writeToFile = Sink $ do
        line <- await
        liftIO . putStrLn $ "Totally writing this to a file: " ++ line

    writeToConsole :: Sink IO String
    writeToConsole = Sink $ do
        line <- await
        liftIO . putStrLn $ "Console out: " ++ line

    writeOut :: Sink IO String
    writeOut = writeToFile <> writeToConsole

    main :: IO ()
    main = do
        runTube $ each [1..3] >< map show >< forever (pour writeOut)
        -- Totally writing this to a file: 1
        -- Console out: 1
        -- Totally writing this to a file: 2
        -- Console out: 2
        -- Totally writing this to a file: 3
        -- Console out: 3




pour :: Tube a () m ()


Monad m => Contravariant (Sink m) Source 
Monad m => Divisible (Sink m) Source 
Monad m => Decidable (Sink m) Source 
Monad m => Semigroup (Sink m a) Source 


newtype Channel m a b Source

A Channel m a b is a stream processor which converts values of type a into values of type b, while also performing side-effects in some monad m.

If a Channel yields exactly once after each time it awaits then it may be safely treated as an Arrow. For example:

    {-# LANGUAGE Arrows #-}

    import Tubes
    import Control.Arrow
    import Prelude hiding (map)

    -- A simple channel which accumulates a total
    total :: (Num a, Monad m) => Channel m a a
    total = Channel $ loop 0 where
        loop acc = do
            n <- await
            let acc' = n + acc
            yield acc'
            loop acc'

    -- A running average using two totals in parallel
    avg :: (Fractional a, Monad m) => Channel m a a
    avg = proc value -> do
        t <- total -< value
        n <- total -< 1
        returnA -< t / n

    main :: IO ()
    main = runTube $ each [0,10,7,8]
                  >< tune avg
                  >< map show
                  >< pour display

This program would output


This has interesting potential in FRP applications.




tune :: Tube a b m ()


Monad m => Category * (Channel m) Source 
Monad m => Arrow (Channel m) Source 
Monad m => Profunctor (Channel m) Source 

tee :: Monad m => Sink m a -> Channel m a a Source

Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.

Useful example:

    import Data.Semigroup

    writeToFile :: Sink IO String
    writeToFile = Sink $ do
        line <- await
        liftIO . putStrLn $ "Totally writing this to a file: " ++ line

    writeToConsole :: Sink IO String
    writeToConsole = Sink $ do
        line <- await
        liftIO . putStrLn $ "Console out: " ++ line

    writeOut :: Channel IO String String
    writeOut = tee $ writeToFile <> writeToConsole

    main :: IO ()
    main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display
    --  Totally writing this to a file: a
    --  Console out: a
    --  a
    --  Totally writing this to a file: b
    --  Console out: b
    --  b
    --  Totally writing this to a file: c
    --  Console out: c
    --  c

This takes advantage of the divisible nature of Sinks to merge effectful computations and then continue the process.


type Pump b a = CofreeT (PumpF b a) Source

Pumps are the dual of Tubes. Where a Tube may either be awaiting or yielding, a Pump is always in a position to send or recv data. They are the machines which run Tubes, essentially.

Pumps may be used to formulate infinite streams and folds.

TODO: more examples!

Note the type arguments are "backward" from the Tube point of view: a Pump b a w r may be sent values of type a and you may receive b values from it.

send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source

Send a Pump a value, yielding a new Pump.

recv :: Comonad w => Pump a b w r -> (a, Pump a b w r) Source

Receive a value from a Pump, along with a new Pump for the future.

pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source

Construct a Pump based on an arbitrary comonad.

lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source

Constructs a resumable left fold. Example usage:

    summer :: Pump () Int Identity Int
    summer = lfold (+) (x -> ((),x)) 0

    main :: IO ()
    main = do
        result <- stream const (duplicate summer) $ each [1..10]
        putStrLn . show . extract $ result -- "55"
        result2 <- stream const (duplicate result) $ each [11..20]
        putStrLn . show . extract $ result2 -- "210"


stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r Source

Process a Tube stream with a given Pump, and merge their results.

streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r Source

Process a Tube stream with an effectful Pump, and merge their results.

cat :: Monad m => Tube a a m r Source

Continuously relays any values it receives.

for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r Source

Loops over a Tube and gives each yielded value to the continuation.

each :: (Monad m, Foldable t) => t b -> Tube () b m () Source

every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m () Source

map :: Monad m => (a -> b) -> Tube a b m r Source

Transforms all incoming values according to some function.

drop :: Monad m => Int -> Tube a a m r Source

Refuses to yield the first n values it receives.

take :: Monad m => Int -> Tube a a m () Source

Relay only the first n elements of a stream.

takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source

Terminates the stream upon receiving a value violating the predicate

filter :: Monad m => (a -> Bool) -> Tube a a m r Source

Yields only values satisfying some predicate.

unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source

Taps the next value from a source, maybe.

pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source

Similar to unyield but it first sends a value through the tube.

mapM :: Monad m => (a -> m b) -> Tube a b m r Source

Similar to map except it maps a monadic function instead of a pure one.

sequence :: Monad m => Tube (m a) a m r Source

Evaluates and extracts a pure value from a monadic one.

stop :: Monad m => Tube a () m r Source

A default tube to end a series when no further processing is required.


prompt :: MonadIO m => Source m String Source

Source of Strings from stdin. This is mostly for debugging / ghci example purposes.

display :: MonadIO m => Sink m String Source

Sink for Strings to stdout. This is mostly for debugging / ghci example purposes.


liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a)) Source

runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))


Code is worth a thousand words. This program ...

    import Prelude hiding (map)
    import qualified Prelude as P

    import Data.Semigroup
    import Control.Monad (forever)

    import Tubes

    srcA :: MonadIO m => Source m String
    srcA = Source $ each ["line A1", "line A2", "line A3"]

    srcB :: MonadIO m => Source m String
    srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"]

    -- Synchronously merge input
    srcAB :: MonadIO m => Source m String
    srcAB = srcA merge srcB

    writeToFile :: MonadIO m => Sink m String
    writeToFile = Sink $ do
        line <- await
        liftIO . putStrLn $ "Totally writing this to a file: " ++ line

    writeToConsole :: MonadIO m => Sink m String
    writeToConsole = Sink $ do
        line <- await
        liftIO . putStrLn $ "Console out: " ++ line

    -- Merge outputs together
    writeOut :: MonadIO m => Sink m String
    writeOut = writeToFile <> writeToConsole

    -- And make outputs re-forward their input data
    writeOut' :: MonadIO m => Channel m String String
    writeOut' = tee writeOut

    main :: IO ()
    main = runTube $ sample srcAB
                  >< tune writeOut'
                  >< pour display

... gives this output:

    Totally writing this to a file: line A1
    Console out: line A1
    line A1
    Totally writing this to a file: line B1
    Console out: line B1
    line B1
    Totally writing this to a file: line A2
    Console out: line A2
    line A2
    Totally writing this to a file: line B2
    Console out: line B2
    line B2
    Totally writing this to a file: line A3
    Console out: line A3
    line A3
    Totally writing this to a file: line B3
    Console out: line B3
    line B3
    Totally writing this to a file: line B4
    Console out: line B4
    line B4