Copyright | (c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net> |
---|---|
License | GPL-3 |
Maintainer | gatlin@niltag.net |
Stability | experimental |
Safe Haskell | Safe |
Language | Haskell2010 |
Write effect-ful stream processing functions and compose them into a series of tubes.
- type Tube a b = FreeT (TubeF a b)
- yield :: Monad m => b -> Tube a b m ()
- await :: Monad m => Tube a b m a
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- runTube :: Monad m => Tube () () m r -> m r
- halt :: Monad m => Tube a b m ()
- newtype Source m a = Source {}
- reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b
- newtype Sink m a = Sink {}
- newtype Channel m a b = Channel {}
- tee :: Monad m => Sink m a -> Channel m a a
- type Pump b a = CofreeT (PumpF b a)
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r
- lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x
- stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r
- streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r
- cat :: Monad m => Tube a a m r
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- each :: (Monad m, Foldable t) => t b -> Tube () b m ()
- every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m ()
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ()))
- pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ()))
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- stop :: Monad m => Tube a () m r
- prompt :: MonadIO m => Source m String
- display :: MonadIO m => Sink m String
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Tube
type Tube a b = FreeT (TubeF a b) Source
The central data type. Tube
s stacked on top of the same base monad m
may be
composed in series, so long as their type arguments agree.
yield :: Monad m => b -> Tube a b m () Source
Command telling a Tube
computation to yield data downstream and pause.
await :: Monad m => Tube a b m a Source
Command telling a Tube
computation to pause and await upstream data.
Sources
An exhaustible source of values parameterized over a base monad. It never
await
s, it only yield
s.
Source
s are monad transformers in their own right, as they are possibly
finite. They may also be synchronously merged as monoids:
import Data.Monoid src1 :: Source IO String src1 = Source $ each ["line A1", "line A2", "line A3"] src2 :: Source IO String src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"] src3 :: Source IO String src3 = src1 <> src2 main :: IO () main = runTube $ sample (src1 <> src2) >< pour display -- line A1 -- line B1 -- line A2 -- line B2 -- line A3 -- line B3 -- line B4
If one source runs out, the other will continue until completion.
MonadTrans Source Source | |
Monad m => Monad (Source m) Source | |
Monad m => Functor (Source m) Source | |
Monad m => Applicative (Source m) Source | |
Monad m => Alternative (Source m) Source | |
Monad m => MonadPlus (Source m) Source | |
MonadIO m => MonadIO (Source m) Source | |
Monad m => Monoid (Source m a) Source | |
Monad m => Semigroup (Source m a) Source |
Sinks
A potentially full sink of values parameterized over a base monad. It never
yield
s.
A Sink
is a contravariant functor. Intuitively this means that it is a
consumer of some base type, and you may map transformations over its input
before it is consumed.
Example:
import Data.Functor.Contravariant add5 :: Sink IO Int add5 = Sink $ loop 0 5 where loop acc 0 = do liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc) halt loop acc count = do n <- await loop (acc + n) (count - 1) times2Add5:: Sink IO Int times2Add5 = (*2) >$< add5 main :: IO () main = do runTube $ each [1..10] >< pour add5 -- "Sum of five numbers: 15" runTube $ each [1..10] >< pour times2Add5 -- "Sum of five numbers: 30"
Sink
s may also be merged together, as they form a semigroup:
import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Sink IO String writeOut = writeToFile <> writeToConsole main :: IO () main = do runTube $ each [1..3] >< map show >< forever (pour writeOut) -- Totally writing this to a file: 1 -- Console out: 1 -- Totally writing this to a file: 2 -- Console out: 2 -- Totally writing this to a file: 3 -- Console out: 3
Channels
A Channel m a b
is a stream processor which converts values of type a
into
values of type b
, while also performing side-effects in some monad m
.
If a Channel
yield
s exactly once after each time it await
s then it may be
safely treated as an Arrow
. For example:
{-# LANGUAGE Arrows #-} import Tubes import Control.Arrow import Prelude hiding (map) -- A simple channel which accumulates a total total :: (Num a, Monad m) => Channel m a a total = Channel $ loop 0 where loop acc = do n <- await let acc' = n + acc yield acc' loop acc' -- A running average using two totals in parallel avg :: (Fractional a, Monad m) => Channel m a a avg = proc value -> do t <- total -< value n <- total -< 1 returnA -< t / n main :: IO () main = runTube $ each [0,10,7,8] >< tune avg >< map show >< pour display
This program would output
0.0 5.0 5.666666666666667 6.25
This has interesting potential in FRP applications.
tee :: Monad m => Sink m a -> Channel m a a Source
Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.
Useful example:
import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Channel IO String String writeOut = tee $ writeToFile <> writeToConsole main :: IO () main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display -- Totally writing this to a file: a -- Console out: a -- a -- Totally writing this to a file: b -- Console out: b -- b -- Totally writing this to a file: c -- Console out: c -- c
This takes advantage of the divisible nature of Sink
s to merge effectful
computations and then continue the process.
Pump
type Pump b a = CofreeT (PumpF b a) Source
Pumps are the dual of Tube
s. Where a Tube
may either be await
ing or
yield
ing, a Pump
is always in a position to send
or recv
data. They are
the machines which run Tube
s, essentially.
Pumps may be used to formulate infinite streams and folds.
TODO: more examples!
Note the type arguments are "backward" from the Tube
point of view: a
Pump b a w r
may be sent values of type a
and you may receive b
values
from it.
pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source
Construct a Pump
based on an arbitrary comonad.
lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source
Constructs a resumable left fold. Example usage:
summer :: Pump () Int Identity Int summer = lfold (+) (x -> ((),x)) 0 main :: IO () main = do result <- stream const (duplicate summer) $ each [1..10] putStrLn . show . extract $ result -- "55" result2 <- stream const (duplicate result) $ each [11..20] putStrLn . show . extract $ result2 -- "210"
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source
Taps the next value from a source, maybe.
pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source
Similar to unyield
but it first sends a value through the tube.
mapM :: Monad m => (a -> m b) -> Tube a b m r Source
Similar to map
except it maps a monadic function instead of a pure one.
sequence :: Monad m => Tube (m a) a m r Source
Evaluates and extracts a pure value from a monadic one.
stop :: Monad m => Tube a () m r Source
A default tube to end a series when no further processing is required.
Miscellaneous
prompt :: MonadIO m => Source m String Source
Source of String
s from stdin. This is mostly for debugging / ghci example purposes.
display :: MonadIO m => Sink m String Source
Sink for String
s to stdout. This is mostly for debugging / ghci example
purposes.
Re-exports
liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a)) Source
runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Example
Code is worth a thousand words. This program ...
import Prelude hiding (map) import qualified Prelude as P import Data.Semigroup import Control.Monad (forevGer) import Tubes srcA :: MonadIO m => Source m String srcA = Source $ each ["line A1", "line A2", "line A3"] srcB :: MonadIO m => Source m String srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"] -- Synchronously merge input srcAB :: MonadIO m => Source m String srcAB = srcA <> srcB writeToFile :: MonadIO m => Sink m String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: MonadIO m => Sink m String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line -- Merge outputs together writeOut :: MonadIO m => Sink m String writeOut = writeToFile <> writeToConsole -- And make outputs re-forward their input data writeOut' :: MonadIO m => Channel m String String writeOut' = tee writeOut main :: IO () main = runTube $ sample srcAB >< tune writeOut' >< pour display
... gives this output:
Totally writing this to a file: line A1 Console out: line A1 line A1 Totally writing this to a file: line B1 Console out: line B1 line B1 Totally writing this to a file: line A2 Console out: line A2 line A2 Totally writing this to a file: line B2 Console out: line B2 line B2 Totally writing this to a file: line A3 Console out: line A3 line A3 Totally writing this to a file: line B3 Console out: line B3 line B3 Totally writing this to a file: line B4 Console out: line B4 line B4