tubes-2.0.0.1: Write stream processing computations with side effects in a series of tubes.

Copyright(c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net>
LicenseGPL-3
Maintainergatlin@niltag.net
Stabilityexperimental
Safe HaskellSafe
LanguageHaskell2010

Tubes

Contents

Description

Write effect-ful stream processing functions and compose them into a series of tubes.

Synopsis

Tube

 

type Tube a b = FreeT (TubeF a b) Source

The central data type. Tubes stacked on top of the same base monad m may be composed in series, so long as their type arguments agree.

yield :: Monad m => b -> Tube a b m () Source

Command telling a Tube computation to yield data downstream and pause.

await :: Monad m => Tube a b m a Source

Command telling a Tube computation to pause and await upstream data.

(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source

Compose compatible tubes in series to produce a new Tube.

    each [1..10] >(*2)< pour display

runTube :: Monad m => Tube () () m r -> m r Source

Run a self-contained Tube computation.

halt :: Monad m => Tube a b m () Source

Command telling a Tube with base type () to simply stop.

Sources

newtype Source m a Source

An exhaustible source of values parameterized over a base monad. It never awaits, it only yields.

Sources are monad transformers in their own right, as they are possibly finite. They may also be synchronously merged as monoids:

    import Data.Monoid

    src1 :: Source IO String
    src1 = Source $ each ["line A1", "line A2", "line A3"]

    src2 :: Source IO String
    src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"]

    src3 :: Source IO String
    src3 = src1 <> src2

    main :: IO ()
    main = runTube $ sample (src1 <> src2) >< pour display
    -- line A1
    -- line B1
    -- line A2
    -- line B2
    -- line A3
    -- line B3
    -- line B4

If one source runs out, the other will continue until completion.

Constructors

Source 

Fields

sample :: Tube () a m ()
 

reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b Source

Strict left-fold of a Source, using a Pump internally.

Sinks

newtype Sink m a Source

A potentially full sink of values parameterized over a base monad. It never yields.

A Sink is a contravariant functor. Intuitively this means that it is a consumer of some base type, and you may map transformations over its input before it is consumed.

Example:

    import Data.Functor.Contravariant

    add5 :: Sink IO Int
    add5 = Sink $ loop 0 5 where
        loop acc 0 = do
            liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc)
            halt
        loop acc count = do
            n <- await
            loop (acc + n) (count - 1)

    times2Add5:: Sink IO Int
    times2Add5 = (*2) >$< add5

    main :: IO ()
    main = do
        runTube $ each [1..10] >< pour add5
        -- "Sum of five numbers: 15"

        runTube $ each [1..10] >< pour times2Add5
        -- "Sum of five numbers: 30"

Sinks may also be merged together, as they form a semigroup:

    import Data.Semigroup

    writeToFile :: Sink IO String
    writeToFile = Sink $ do
        line <- await
        liftIO . putStrLn $ "Totally writing this to a file: " ++ line

    writeToConsole :: Sink IO String
    writeToConsole = Sink $ do
        line <- await
        liftIO . putStrLn $ "Console out: " ++ line

    writeOut :: Sink IO String
    writeOut = writeToFile <> writeToConsole

    main :: IO ()
    main = do
        runTube $ each [1..3] >< map show >< forever (pour writeOut)
        -- Totally writing this to a file: 1
        -- Console out: 1
        -- Totally writing this to a file: 2
        -- Console out: 2
        -- Totally writing this to a file: 3
        -- Console out: 3

Constructors

Sink 

Fields

pour :: Tube a () m ()
 

Instances

Monad m => Contravariant (Sink m) Source 
Monad m => Divisible (Sink m) Source 
Monad m => Decidable (Sink m) Source 
Monad m => Semigroup (Sink m a) Source 

Channels

newtype Channel m a b Source

A Channel m a b is a stream processor which converts values of type a into values of type b, while also performing side-effects in some monad m.

If a Channel yields exactly once after each time it awaits then it may be safely treated as an Arrow. For example:

    {-# LANGUAGE Arrows #-}

    import Tubes
    import Control.Arrow
    import Prelude hiding (map)

    -- A simple channel which accumulates a total
    total :: (Num a, Monad m) => Channel m a a
    total = Channel $ loop 0 where
        loop acc = do
            n <- await
            let acc' = n + acc
            yield acc'
            loop acc'

    -- A running average using two totals in parallel
    avg :: (Fractional a, Monad m) => Channel m a a
    avg = proc value -> do
        t <- total -< value
        n <- total -< 1
        returnA -< t / n

    main :: IO ()
    main = runTube $ each [0,10,7,8]
                  >< tune avg
                  >< map show
                  >< pour display

This program would output

    0.0
    5.0
    5.666666666666667
    6.25

This has interesting potential in FRP applications.

Constructors

Channel 

Fields

tune :: Tube a b m ()
 

Instances

Monad m => Category * (Channel m) Source 
Monad m => Arrow (Channel m) Source 
Monad m => Profunctor (Channel m) Source 

tee :: Monad m => Sink m a -> Channel m a a Source

Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.

Useful example:

    import Data.Semigroup

    writeToFile :: Sink IO String
    writeToFile = Sink $ do
        line <- await
        liftIO . putStrLn $ "Totally writing this to a file: " ++ line

    writeToConsole :: Sink IO String
    writeToConsole = Sink $ do
        line <- await
        liftIO . putStrLn $ "Console out: " ++ line

    writeOut :: Channel IO String String
    writeOut = tee $ writeToFile <> writeToConsole

    main :: IO ()
    main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display
    --  Totally writing this to a file: a
    --  Console out: a
    --  a
    --  Totally writing this to a file: b
    --  Console out: b
    --  b
    --  Totally writing this to a file: c
    --  Console out: c
    --  c

This takes advantage of the divisible nature of Sinks to merge effectful computations and then continue the process.

Pump

type Pump b a = CofreeT (PumpF b a) Source

Pumps are the dual of Tubes. Where a Tube may either be awaiting or yielding, a Pump is always in a position to send or recv data. They are the machines which run Tubes, essentially.

Pumps may be used to formulate infinite streams and folds.

TODO: more examples!

Note the type arguments are "backward" from the Tube point of view: a Pump b a w r may be sent values of type a and you may receive b values from it.

send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source

Send a Pump a value, yielding a new Pump.

recv :: Comonad w => Pump a b w r -> (a, Pump a b w r) Source

Receive a value from a Pump, along with a new Pump for the future.

pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source

Construct a Pump based on an arbitrary comonad.

lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source

Constructs a resumable left fold. Example usage:

    summer :: Pump () Int Identity Int
    summer = lfold (+) (x -> ((),x)) 0

    main :: IO ()
    main = do
        result <- stream const (duplicate summer) $ each [1..10]
        putStrLn . show . extract $ result -- "55"
        result2 <- stream const (duplicate result) $ each [11..20]
        putStrLn . show . extract $ result2 -- "210"

Utilities

stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r Source

Process a Tube stream with a given Pump, and merge their results.

streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r Source

Process a Tube stream with an effectful Pump, and merge their results.

cat :: Monad m => Tube a a m r Source

Continuously relays any values it receives.

for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r Source

Loops over a Tube and gives each yielded value to the continuation.

each :: (Monad m, Foldable t) => t b -> Tube () b m () Source

every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m () Source

map :: Monad m => (a -> b) -> Tube a b m r Source

Transforms all incoming values according to some function.

drop :: Monad m => Int -> Tube a a m r Source

Refuses to yield the first n values it receives.

take :: Monad m => Int -> Tube a a m () Source

Relay only the first n elements of a stream.

takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source

Terminates the stream upon receiving a value violating the predicate

filter :: Monad m => (a -> Bool) -> Tube a a m r Source

Yields only values satisfying some predicate.

unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source

Taps the next value from a source, maybe.

pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source

Similar to unyield but it first sends a value through the tube.

mapM :: Monad m => (a -> m b) -> Tube a b m r Source

Similar to map except it maps a monadic function instead of a pure one.

sequence :: Monad m => Tube (m a) a m r Source

Evaluates and extracts a pure value from a monadic one.

stop :: Monad m => Tube a () m r Source

A default tube to end a series when no further processing is required.

Miscellaneous

prompt :: MonadIO m => Source m String Source

Source of Strings from stdin. This is mostly for debugging / ghci example purposes.

display :: MonadIO m => Sink m String Source

Sink for Strings to stdout. This is mostly for debugging / ghci example purposes.

Re-exports

liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a)) Source

runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))

Example

Code is worth a thousand words. This program ...

    import Prelude hiding (map)
    import qualified Prelude as P

    import Data.Semigroup
    import Control.Monad (forevGer)

    import Tubes

    srcA :: MonadIO m => Source m String
    srcA = Source $ each ["line A1", "line A2", "line A3"]

    srcB :: MonadIO m => Source m String
    srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"]

    -- Synchronously merge input
    srcAB :: MonadIO m => Source m String
    srcAB = srcA <> srcB

    writeToFile :: MonadIO m => Sink m String
    writeToFile = Sink $ do
        line <- await
        liftIO . putStrLn $ "Totally writing this to a file: " ++ line

    writeToConsole :: MonadIO m => Sink m String
    writeToConsole = Sink $ do
        line <- await
        liftIO . putStrLn $ "Console out: " ++ line

    -- Merge outputs together
    writeOut :: MonadIO m => Sink m String
    writeOut = writeToFile <> writeToConsole

    -- And make outputs re-forward their input data
    writeOut' :: MonadIO m => Channel m String String
    writeOut' = tee writeOut

    main :: IO ()
    main = runTube $ sample srcAB
                  >< tune writeOut'
                  >< pour display

... gives this output:

    Totally writing this to a file: line A1
    Console out: line A1
    line A1
    Totally writing this to a file: line B1
    Console out: line B1
    line B1
    Totally writing this to a file: line A2
    Console out: line A2
    line A2
    Totally writing this to a file: line B2
    Console out: line B2
    line B2
    Totally writing this to a file: line A3
    Console out: line A3
    line A3
    Totally writing this to a file: line B3
    Console out: line B3
    line B3
    Totally writing this to a file: line B4
    Console out: line B4
    line B4