| Copyright | (c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net> |
|---|---|
| License | GPL-3 |
| Maintainer | gatlin@niltag.net |
| Stability | experimental |
| Safe Haskell | Trustworthy |
| Language | Haskell2010 |
Tubes
Description
Write effect-ful stream processing functions and compose them into a series of tubes.
- type Tube a b = FreeT (TubeF a b)
- yield :: Monad m => b -> Tube a b m ()
- await :: Monad m => Tube a b m a
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- runTube :: Monad m => Tube () () m r -> m r
- halt :: Monad m => Tube a b m ()
- newtype Source m a = Source {}
- reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b
- merge :: Monad m => Source m a -> Source m a -> Source m a
- newtype Sink m a = Sink {}
- newtype Channel m a b = Channel {}
- tee :: Monad m => Sink m a -> Channel m a a
- type Pump b a = CofreeT (PumpF b a)
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r
- lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x
- stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r
- streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r
- cat :: Monad m => Tube a a m r
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- each :: (Monad m, Foldable t) => t b -> Tube () b m ()
- every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m ()
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ()))
- pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ()))
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- stop :: Monad m => Tube a () m r
- prompt :: MonadIO m => Source m String
- display :: MonadIO m => Sink m String
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Tube
A Tube is a computation which may await values from an upstream source,
yield values to a downstream receiver, or both. Tubes may be composed in
series to build complex stream processors using the '(><)' operator.
Tubes are also monad transformers so you can add stream processing
capabilities to any base monad.
There are three varieties of Tubes which have different properties:
Sources, Sinks, and Channels. They each restrict the Tube type in
different ways to guarantee correctness while still allowing them to be
composed. More information is provided with their respective definitions.
The dual to Tube is Pump. It is a comonad which endows another base comonad
with the ability to send and recv values.
Several useful Tube functions - like runTube, reduce, and stream - are
implemented in terms of Pump. Beyond simply evaluating Tubes they have
other uses. The lfold function, for instance, constructs a resumable left fold
structure.
This library is inspired in large part by pipes, conduit, and others. While
it intends to be efficient and useful in its own right it began as an exercise
in implementing the basics of those other libraries with comonads and dualities
in mind.
type Tube a b = FreeT (TubeF a b) Source
The central data type. Tubes stacked on top of the same base monad m may be
composed in series, so long as their type arguments agree.
yield :: Monad m => b -> Tube a b m () Source
Command telling a Tube computation to yield data downstream and pause.
await :: Monad m => Tube a b m a Source
Command telling a Tube computation to pause and await upstream data.
Sources
An exhaustible source of values parameterized over a base monad. It never
awaits, it only yields.
Sources are monad transformers in their own right, as they are possibly
finite. They may also be synchronously merged:
src1 :: Source IO String
src1 = Source $ each ["line A1", "line A2", "line A3"]
src2 :: Source IO String
src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"]
src3 :: Source IO String
src3 = src1 merge src2
main :: IO ()
main = runTube $ sample src3 >< pour display
-- line A1
-- line B1
-- line A2
-- line B2
-- line A3
-- line B3
-- line B4
If one source runs out, the other will continue until completion.
Digression: originally merge was the implementation for mappend and '(<>)'.
However because Source is ultimately a list transformer I thought it better
that these instances preserve the behavior found in lists and instead provide a
separate function for synchronous merging.
Instances
| MonadTrans Source Source | |
| Monad m => Monad (Source m) Source | |
| Monad m => Functor (Source m) Source | |
| Monad m => Applicative (Source m) Source | |
| Monad m => Alternative (Source m) Source | |
| Monad m => MonadPlus (Source m) Source | |
| MonadIO m => MonadIO (Source m) Source | |
| (Monad m, Floating a) => Floating (Source m a) Source | |
| (Monad m, Fractional a) => Fractional (Source m a) Source | |
| (Monad m, Num a) => Num (Source m a) Source | |
| Monad m => Monoid (Source m a) Source | |
| Monad m => Semigroup (Source m a) Source |
merge :: Monad m => Source m a -> Source m a -> Source m a Source
Interleave the values of two Sources until both are exhausted.
Sinks
A potentially full sink of values parameterized over a base monad. It never
yields.
A Sink is a contravariant functor. Intuitively this means that it is a
consumer of some base type, and you may map transformations over its input
before it is consumed.
Example:
import Data.Functor.Contravariant
add5 :: Sink IO Int
add5 = Sink $ loop 0 5 where
loop acc 0 = do
liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc)
halt
loop acc count = do
n <- await
loop (acc + n) (count - 1)
times2Add5:: Sink IO Int
times2Add5 = (*2) >$< add5
main :: IO ()
main = do
runTube $ each [1..10] >< pour add5
-- "Sum of five numbers: 15"
runTube $ each [1..10] >< pour times2Add5
-- "Sum of five numbers: 30"
Sinks may also be merged together, as they form a semigroup:
import Data.Semigroup
writeToFile :: Sink IO String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: Sink IO String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
writeOut :: Sink IO String
writeOut = writeToFile <> writeToConsole
main :: IO ()
main = do
runTube $ each [1..3] >< map show >< forever (pour writeOut)
-- Totally writing this to a file: 1
-- Console out: 1
-- Totally writing this to a file: 2
-- Console out: 2
-- Totally writing this to a file: 3
-- Console out: 3
Channels
A Channel m a b is a stream processor which converts values of type a into
values of type b, while also performing side-effects in some monad m.
If a Channel yields exactly once after each time it awaits then it may be
safely treated as an Arrow. For example:
{-# LANGUAGE Arrows #-}
import Tubes
import Control.Arrow
import Prelude hiding (map)
-- A simple channel which accumulates a total
total :: (Num a, Monad m) => Channel m a a
total = Channel $ loop 0 where
loop acc = do
n <- await
let acc' = n + acc
yield acc'
loop acc'
-- A running average using two totals in parallel
avg :: (Fractional a, Monad m) => Channel m a a
avg = proc value -> do
t <- total -< value
n <- total -< 1
returnA -< t / n
main :: IO ()
main = runTube $ each [0,10,7,8]
>< tune avg
>< map show
>< pour display
This program would output
0.0
5.0
5.666666666666667
6.25
This has interesting potential in FRP applications.
tee :: Monad m => Sink m a -> Channel m a a Source
Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.
Useful example:
import Data.Semigroup
writeToFile :: Sink IO String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: Sink IO String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
writeOut :: Channel IO String String
writeOut = tee $ writeToFile <> writeToConsole
main :: IO ()
main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display
-- Totally writing this to a file: a
-- Console out: a
-- a
-- Totally writing this to a file: b
-- Console out: b
-- b
-- Totally writing this to a file: c
-- Console out: c
-- c
This takes advantage of the divisible nature of Sinks to merge effectful
computations and then continue the process.
Pump
type Pump b a = CofreeT (PumpF b a) Source
Pumps are the dual of Tubes. Where a Tube may either be awaiting or
yielding, a Pump is always in a position to send or recv data. They are
the machines which run Tubes, essentially.
Pumps may be used to formulate infinite streams and folds.
TODO: more examples!
Note the type arguments are "backward" from the Tube point of view: a
Pump b a w r may be sent values of type a and you may receive b values
from it.
pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source
Construct a Pump based on an arbitrary comonad.
lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source
Constructs a resumable left fold. Example usage:
summer :: Pump () Int Identity Int
summer = lfold (+) (x -> ((),x)) 0
main :: IO ()
main = do
result <- stream const (duplicate summer) $ each [1..10]
putStrLn . show . extract $ result -- "55"
result2 <- stream const (duplicate result) $ each [11..20]
putStrLn . show . extract $ result2 -- "210"
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source
Taps the next value from a source, maybe.
pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source
Similar to unyield but it first sends a value through the tube.
mapM :: Monad m => (a -> m b) -> Tube a b m r Source
Similar to map except it maps a monadic function instead of a pure one.
sequence :: Monad m => Tube (m a) a m r Source
Evaluates and extracts a pure value from a monadic one.
stop :: Monad m => Tube a () m r Source
A default tube to end a series when no further processing is required.
Miscellaneous
prompt :: MonadIO m => Source m String Source
Source of Strings from stdin. This is mostly for debugging / ghci example purposes.
display :: MonadIO m => Sink m String Source
Sink for Strings to stdout. This is mostly for debugging / ghci example
purposes.
Re-exports
runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Example
Code is worth a thousand words. This program ...
import Prelude hiding (map)
import qualified Prelude as P
import Data.Semigroup
import Control.Monad (forever)
import Tubes
srcA :: MonadIO m => Source m String
srcA = Source $ each ["line A1", "line A2", "line A3"]
srcB :: MonadIO m => Source m String
srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"]
-- Synchronously merge input
srcAB :: MonadIO m => Source m String
srcAB = srcA merge srcB
writeToFile :: MonadIO m => Sink m String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: MonadIO m => Sink m String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
-- Merge outputs together
writeOut :: MonadIO m => Sink m String
writeOut = writeToFile <> writeToConsole
-- And make outputs re-forward their input data
writeOut' :: MonadIO m => Channel m String String
writeOut' = tee writeOut
main :: IO ()
main = runTube $ sample srcAB
>< tune writeOut'
>< pour display
... gives this output:
Totally writing this to a file: line A1
Console out: line A1
line A1
Totally writing this to a file: line B1
Console out: line B1
line B1
Totally writing this to a file: line A2
Console out: line A2
line A2
Totally writing this to a file: line B2
Console out: line B2
line B2
Totally writing this to a file: line A3
Console out: line A3
line A3
Totally writing this to a file: line B3
Console out: line B3
line B3
Totally writing this to a file: line B4
Console out: line B4
line B4