Copyright | (c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net> |
---|---|
License | GPL-3 |
Maintainer | gatlin@niltag.net |
Stability | experimental |
Safe Haskell | Trustworthy |
Language | Haskell2010 |
Write effect-ful stream processing functions and compose them into a series of tubes.
- type Tube a b = FreeT (TubeF a b)
- yield :: Monad m => b -> Tube a b m ()
- await :: Monad m => Tube a b m a
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- runTube :: Monad m => Tube () () m r -> m r
- halt :: Monad m => Tube a b m ()
- newtype Source m a = Source {}
- reduce :: Monad m => (b -> a -> b) -> b -> Tube () a m () -> m b
- merge :: Monad m => Source m a -> Source m a -> Source m a
- newtype Sink m a = Sink {}
- newtype Channel m a b = Channel {}
- tee :: Monad m => Sink m a -> Channel m a a
- type Pump b a = CofreeT (PumpF b a)
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r
- lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x
- stream :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w a -> Tube c d m b -> m r
- streamM :: (Monad m, Comonad w) => (a -> b -> r) -> Pump c d w (m a) -> Tube c d m b -> m r
- cat :: Monad m => Tube a a m r
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- each :: (Monad m, Foldable t) => t b -> Tube () b m ()
- every :: (Foldable t, Monad m) => t b -> Tube () (Maybe b) m ()
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ()))
- pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ()))
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- stop :: Monad m => Tube a () m r
- prompt :: MonadIO m => Source m String
- display :: MonadIO m => Sink m String
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Tube
A Tube
is a computation which may await
values from an upstream source,
yield
values to a downstream receiver, or both. Tube
s may be composed in
series to build complex stream processors using the '(><)' operator.
Tube
s are also monad transformers so you can add stream processing
capabilities to any base monad.
There are three varieties of Tube
s which have different properties:
Source
s, Sink
s, and Channel
s. They each restrict the Tube
type in
different ways to guarantee correctness while still allowing them to be
composed. More information is provided with their respective definitions.
The dual to Tube
is Pump
. It is a comonad which endows another base comonad
with the ability to send
and recv
values.
Several useful Tube
functions - like runTube
, reduce
, and stream
- are
implemented in terms of Pump
. Beyond simply evaluating Tube
s they have
other uses. The lfold
function, for instance, constructs a resumable left fold
structure.
This library is inspired in large part by pipes
, conduit
, and others. While
it intends to be efficient and useful in its own right it began as an exercise
in implementing the basics of those other libraries with comonads and dualities
in mind.
type Tube a b = FreeT (TubeF a b) Source
The central data type. Tube
s stacked on top of the same base monad m
may be
composed in series, so long as their type arguments agree.
yield :: Monad m => b -> Tube a b m () Source
Command telling a Tube
computation to yield data downstream and pause.
await :: Monad m => Tube a b m a Source
Command telling a Tube
computation to pause and await upstream data.
Sources
An exhaustible source of values parameterized over a base monad. It never
await
s, it only yield
s.
Source
s are monad transformers in their own right, as they are possibly
finite. They may also be synchronously merged:
src1 :: Source IO String
src1 = Source $ each ["line A1", "line A2", "line A3"]
src2 :: Source IO String
src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"]
src3 :: Source IO String
src3 = src1 merge
src2
main :: IO ()
main = runTube $ sample src3 >< pour display
-- line A1
-- line B1
-- line A2
-- line B2
-- line A3
-- line B3
-- line B4
If one source runs out, the other will continue until completion.
Digression: originally merge
was the implementation for mappend
and '(<>)'.
However because Source
is ultimately a list transformer I thought it better
that these instances preserve the behavior found in lists and instead provide a
separate function for synchronous merging.
MonadTrans Source Source | |
Monad m => Monad (Source m) Source | |
Monad m => Functor (Source m) Source | |
Monad m => Applicative (Source m) Source | |
Monad m => Alternative (Source m) Source | |
Monad m => MonadPlus (Source m) Source | |
MonadIO m => MonadIO (Source m) Source | |
(Monad m, Floating a) => Floating (Source m a) Source | |
(Monad m, Fractional a) => Fractional (Source m a) Source | |
(Monad m, Num a) => Num (Source m a) Source | |
Monad m => Monoid (Source m a) Source | |
Monad m => Semigroup (Source m a) Source |
merge :: Monad m => Source m a -> Source m a -> Source m a Source
Interleave the values of two Source
s until both are exhausted.
Sinks
A potentially full sink of values parameterized over a base monad. It never
yield
s.
A Sink
is a contravariant functor. Intuitively this means that it is a
consumer of some base type, and you may map transformations over its input
before it is consumed.
Example:
import Data.Functor.Contravariant add5 :: Sink IO Int add5 = Sink $ loop 0 5 where loop acc 0 = do liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc) halt loop acc count = do n <- await loop (acc + n) (count - 1) times2Add5:: Sink IO Int times2Add5 = (*2) >$< add5 main :: IO () main = do runTube $ each [1..10] >< pour add5 -- "Sum of five numbers: 15" runTube $ each [1..10] >< pour times2Add5 -- "Sum of five numbers: 30"
Sink
s may also be merged together, as they form a semigroup:
import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Sink IO String writeOut = writeToFile <> writeToConsole main :: IO () main = do runTube $ each [1..3] >< map show >< forever (pour writeOut) -- Totally writing this to a file: 1 -- Console out: 1 -- Totally writing this to a file: 2 -- Console out: 2 -- Totally writing this to a file: 3 -- Console out: 3
Channels
A Channel m a b
is a stream processor which converts values of type a
into
values of type b
, while also performing side-effects in some monad m
.
If a Channel
yield
s exactly once after each time it await
s then it may be
safely treated as an Arrow
. For example:
{-# LANGUAGE Arrows #-} import Tubes import Control.Arrow import Prelude hiding (map) -- A simple channel which accumulates a total total :: (Num a, Monad m) => Channel m a a total = Channel $ loop 0 where loop acc = do n <- await let acc' = n + acc yield acc' loop acc' -- A running average using two totals in parallel avg :: (Fractional a, Monad m) => Channel m a a avg = proc value -> do t <- total -< value n <- total -< 1 returnA -< t / n main :: IO () main = runTube $ each [0,10,7,8] >< tune avg >< map show >< pour display
This program would output
0.0 5.0 5.666666666666667 6.25
This has interesting potential in FRP applications.
tee :: Monad m => Sink m a -> Channel m a a Source
Convert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.
Useful example:
import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Channel IO String String writeOut = tee $ writeToFile <> writeToConsole main :: IO () main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display -- Totally writing this to a file: a -- Console out: a -- a -- Totally writing this to a file: b -- Console out: b -- b -- Totally writing this to a file: c -- Console out: c -- c
This takes advantage of the divisible nature of Sink
s to merge effectful
computations and then continue the process.
Pump
type Pump b a = CofreeT (PumpF b a) Source
Pumps are the dual of Tube
s. Where a Tube
may either be await
ing or
yield
ing, a Pump
is always in a position to send
or recv
data. They are
the machines which run Tube
s, essentially.
Pumps may be used to formulate infinite streams and folds.
TODO: more examples!
Note the type arguments are "backward" from the Tube
point of view: a
Pump b a w r
may be sent values of type a
and you may receive b
values
from it.
pumpT :: Comonad w => w r -> (w r -> b -> w r) -> (w r -> (a, w r)) -> Pump a b w r Source
Construct a Pump
based on an arbitrary comonad.
lfold :: (x -> a -> x) -> (x -> (b, x)) -> x -> Pump b a Identity x Source
Constructs a resumable left fold. Example usage:
summer :: Pump () Int Identity Int summer = lfold (+) (x -> ((),x)) 0 main :: IO () main = do result <- stream const (duplicate summer) $ each [1..10] putStrLn . show . extract $ result -- "55" result2 <- stream const (duplicate result) $ each [11..20] putStrLn . show . extract $ result2 -- "210"
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
unyield :: Monad m => Tube x b m () -> m (Maybe (b, Tube x b m ())) Source
Taps the next value from a source, maybe.
pass :: Monad m => a -> Tube a b m () -> m (Maybe (b, Tube a b m ())) Source
Similar to unyield
but it first sends a value through the tube.
mapM :: Monad m => (a -> m b) -> Tube a b m r Source
Similar to map
except it maps a monadic function instead of a pure one.
sequence :: Monad m => Tube (m a) a m r Source
Evaluates and extracts a pure value from a monadic one.
stop :: Monad m => Tube a () m r Source
A default tube to end a series when no further processing is required.
Miscellaneous
prompt :: MonadIO m => Source m String Source
Source of String
s from stdin. This is mostly for debugging / ghci example purposes.
display :: MonadIO m => Sink m String Source
Sink for String
s to stdout. This is mostly for debugging / ghci example
purposes.
Re-exports
runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Example
Code is worth a thousand words. This program ...
import Prelude hiding (map)
import qualified Prelude as P
import Data.Semigroup
import Control.Monad (forever)
import Tubes
srcA :: MonadIO m => Source m String
srcA = Source $ each ["line A1", "line A2", "line A3"]
srcB :: MonadIO m => Source m String
srcB = Source $ each ["line B1", "line B2", "line B3", "line B4"]
-- Synchronously merge input
srcAB :: MonadIO m => Source m String
srcAB = srcA merge
srcB
writeToFile :: MonadIO m => Sink m String
writeToFile = Sink $ do
line <- await
liftIO . putStrLn $ "Totally writing this to a file: " ++ line
writeToConsole :: MonadIO m => Sink m String
writeToConsole = Sink $ do
line <- await
liftIO . putStrLn $ "Console out: " ++ line
-- Merge outputs together
writeOut :: MonadIO m => Sink m String
writeOut = writeToFile <> writeToConsole
-- And make outputs re-forward their input data
writeOut' :: MonadIO m => Channel m String String
writeOut' = tee writeOut
main :: IO ()
main = runTube $ sample srcAB
>< tune writeOut'
>< pour display
... gives this output:
Totally writing this to a file: line A1 Console out: line A1 line A1 Totally writing this to a file: line B1 Console out: line B1 line B1 Totally writing this to a file: line A2 Console out: line A2 line A2 Totally writing this to a file: line B2 Console out: line B2 line B2 Totally writing this to a file: line A3 Console out: line A3 line A3 Totally writing this to a file: line B3 Console out: line B3 line B3 Totally writing this to a file: line B4 Console out: line B4 line B4