| Copyright | (c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net> |
|---|---|
| License | GPL-3 |
| Maintainer | gatlin@niltag.net |
| Stability | experimental |
| Safe Haskell | Safe |
| Language | Haskell2010 |
Tubes
Description
Write effect-ful stream processing functions and compose them into a series of tubes.
- type Tube a b = FreeT (TubeF a b)
- yield :: Monad m => b -> Tube a b m ()
- await :: Monad m => Tube a b m a
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- runTube :: Monad m => Tube () () m r -> m r
- halt :: Monad m => Tube a b m ()
- newtype Source m a = Source {}
- reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b
- newtype Sink m a = Sink {}
- newtype Channel m a b = Channel {}
- tee :: Monad m => Sink m a -> Channel m a a
- type Pump b a = CofreeT (PumpF b a)
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r
- lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x
- stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r
- streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r
- cat :: Monad m => Tube a a m r
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- each :: (Monad m, Foldable t) => t b -> Tube () b m ()
- every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m ()
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ()))
- pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ()))
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- stop :: Monad m => Tube a () m r
- prompt :: MonadIO m => Source m String
- display :: MonadIO m => Sink m String
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Tube
type Tube a b = FreeT (TubeF a b) Source
The central data type. Tubes stacked on top of the same base monad m may be
composed in series, so long as their type arguments agree.
yield :: Monad m => b -> Tube a b m () Source
Command telling a Tube computation to yield data downstream and pause.
await :: Monad m => Tube a b m a Source
Command telling a Tube computation to pause and await upstream data.
Sources
An exhaustible source of values parameterized over a base monad. It never
awaits, it only yields.
Sources are monad transformers in their own right, as they are possibly
finite. They may also be synchronously merged as monoids:
import Data.Monoid
src1 :: Source IO String
src1 = Source $ each ["line A1", "line A2", "line A3"]
src2 :: Source IO String
src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"]
src3 :: Source IO String
src3 = src1 <> src2
main :: IO ()
main = runTube $ sample (src1 <> src2) >< pour display
-- line A1
-- line B1
-- line A2
-- line B2
-- line A3
-- line B3
-- line B4
If one source runs out, the other will continue until completion.
Instances
| MonadTrans Source Source | |
| Monad m => Monad (Source m) Source | |
| Monad m => Functor (Source m) Source | |
| Monad m => Applicative (Source m) Source | |
| Monad m => Alternative (Source m) Source | |
| Monad m => MonadPlus (Source m) Source | |
| MonadIO m => MonadIO (Source m) Source | |
| Monad m => Monoid (Source m a) Source | |
| Monad m => Semigroup (Source m a) Source |
Sinks
A potentially full sink of values parameterized over a base monad. It never
yields.
A Sink is a contravariant functor. Intuitively this means that it is a
consumer of some base type, and you may map transformations over its input
before it is consumed.
Example:
import Data.Functor.Contravariant
add5 :: Sink IO Int
add5 = Sink $ loop 0 5 where
loop acc 0 = do
liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc)
halt
loop acc count = do
n <- await
loop (acc + n) (count - 1)
times2Add5:: Sink IO Int
times2Add5 = (*2) >$< add5
main :: IO ()
main = do
runTube $ each [1..10] >< pour add5
-- "Sum of five numbers: 15"
runTube $ each [1..10] >< pour times2Add5
-- "Sum of five numbers: 30"
Sinks may also be merged together, as they form a semigroup:
import Data.Semigroup
writeToFile :: Sink IO String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: Sink IO String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
writeOut :: Sink IO String
writeOut = writeToFile <> writeToConsole
main :: IO ()
main = do
runTube $ each [1..3] >< map show >< forever (pour writeOut)
-- Totally writing this to a file: 1
-- Console out: 1
-- Totally writing this to a file: 2
-- Console out: 2
-- Totally writing this to a file: 3
-- Console out: 3
Channels
A Channel m a b is a stream processor which converts values of type a into
values of type b, while also performing side-effects in some monad m.
If a Channel yields exactly once after each time it awaits then it may be
safely treated as an Arrow. For example:
{-# LANGUAGE Arrows #-}
import Tubes
import Control.Arrow
import Prelude hiding (map)
-- A simple channel which accumulates a total
total :: (Num a, Monad m) => Channel m a a
total = Channel $ loop 0 where
loop acc = do
n <- await
let acc' = n + acc
yield acc'
loop acc'
-- A running average using two totals in parallel
avg :: (Fractional a, Monad m) => Channel m a a
avg = proc value -> do
t <- total -< value
n <- total -< 1
returnA -< t / n
main :: IO ()
main = runTube $ each [0,10,7,8]
>< tune avg
>< map show
>< pour display
This program would output
0.0
5.0
5.666666666666667
6.25
This has interesting potential in FRP applications.
tee :: Monad m => Sink m a -> Channel m a a Source
Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.
Useful example:
import Data.Semigroup
writeToFile :: Sink IO String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: Sink IO String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
writeOut :: Channel IO String String
writeOut = tee $ writeToFile <> writeToConsole
main :: IO ()
main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display
-- Totally writing this to a file: a
-- Console out: a
-- a
-- Totally writing this to a file: b
-- Console out: b
-- b
-- Totally writing this to a file: c
-- Console out: c
-- c
This takes advantage of the divisible nature of Sinks to merge effectful
computations and then continue the process.
Pump
type Pump b a = CofreeT (PumpF b a) Source
Pumps are the dual of Tubes. Where a Tube may either be awaiting or
yielding, a Pump is always in a position to send or recv data. They are
the machines which run Tubes, essentially.
Pumps may be used to formulate infinite streams and folds.
TODO: more examples!
Note the type arguments are "backward" from the Tube point of view: a
Pump b a w r may be sent values of type a and you may receive b values
from it.
pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source
Construct a Pump based on an arbitrary comonad.
lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source
Constructs a resumable left fold. Example usage:
summer :: Pump () Int Identity Int
summer = lfold (+) (x -> ((),x)) 0
main :: IO ()
main = do
result <- stream const (duplicate summer) $ each [1..10]
putStrLn . show . extract $ result -- "55"
result2 <- stream const (duplicate result) $ each [11..20]
putStrLn . show . extract $ result2 -- "210"
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source
Taps the next value from a source, maybe.
pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source
Similar to unyield but it first sends a value through the tube.
mapM :: Monad m => (a -> m b) -> Tube a b m r Source
Similar to map except it maps a monadic function instead of a pure one.
sequence :: Monad m => Tube (m a) a m r Source
Evaluates and extracts a pure value from a monadic one.
stop :: Monad m => Tube a () m r Source
A default tube to end a series when no further processing is required.
Miscellaneous
prompt :: MonadIO m => Source m String Source
Source of Strings from stdin. This is mostly for debugging / ghci example purposes.
display :: MonadIO m => Sink m String Source
Sink for Strings to stdout. This is mostly for debugging / ghci example
purposes.
Re-exports
liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a)) Source
runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Example
Code is worth a thousand words. This program ...
import Prelude hiding (map)
import qualified Prelude as P
import Data.Semigroup
import Control.Monad (forevGer)
import Tubes
srcA :: MonadIO m => Source m String
srcA = Source $ each ["line A1", "line A2", "line A3"]
srcB :: MonadIO m => Source m String
srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"]
-- Synchronously merge input
srcAB :: MonadIO m => Source m String
srcAB = srcA <> srcB
writeToFile :: MonadIO m => Sink m String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: MonadIO m => Sink m String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
-- Merge outputs together
writeOut :: MonadIO m => Sink m String
writeOut = writeToFile <> writeToConsole
-- And make outputs re-forward their input data
writeOut' :: MonadIO m => Channel m String String
writeOut' = tee writeOut
main :: IO ()
main = runTube $ sample srcAB
>< tune writeOut'
>< pour display
... gives this output:
Totally writing this to a file: line A1
Console out: line A1
line A1
Totally writing this to a file: line B1
Console out: line B1
line B1
Totally writing this to a file: line A2
Console out: line A2
line A2
Totally writing this to a file: line B2
Console out: line B2
line B2
Totally writing this to a file: line A3
Console out: line A3
line A3
Totally writing this to a file: line B3
Console out: line B3
line B3
Totally writing this to a file: line B4
Console out: line B4
line B4