tubes- Effectful, iteratee-inspired stream processing based on a free monad.

Safe HaskellSafe




type Pump a b = CofreeT (PumpF a b) Source

A Pump is the dual to a Tube. Intuitively, if a Tube is a stream- processing computation, then a Pump is both a stream generator and reducer.

Examples may help!

One interesting use of a Pump is as a data stream, which can be fed into a Tube or Sink.

   import Data.Functor.Identity

   e :: Pump (Maybe Int) Int Identity Int
   e = mkPump (Identity 0)
           ((Identity x) -> (Just x, Identity (x+1)))

   ex1 :: IO ()
   ex1 = do
       run $ each e >< take 10 >< map show >< display
       -- displays 0-9 in the console

A Pump may also be used to fold a Source. Indeed, a Pump may be thought of as both a non-recursive left fold and a non-recursive unfold paired together. (This is called a "metamorphism," hence the function "meta".)

   num_src :: Source Int IO ()
   num_src = do
       forM_ [1..] $ n -> do
           lift . putStrLn $ "Yielding " ++ (show n)
           yield n

   enum_ex :: IO ()
   enum_ex = do
       v <- reduce (flip send) (meta (+) 0 (x -> (x,x))) extract $ num_src >< take 5
       putStrLn . show $ "v = " ++ (show v)
       -- v = 15

The following is an example of a Pump both accumulating values from a Source and then enumerating them into a Sink. This gives back both the result of the computation and the unused input.

   import Data.Functor.Identity

   -- a Sink that stops after 5 loops, or when input is exhausted
   sum_snk :: Sink (Maybe Int) IO Int
   sum_snk = do
       ns <- forM [1,2,3,4,5] $ _ -> do
           mn <- await
           case mn of
               Just n -> return [n]
               Nothing -> return []
       return $ sum . concat $ ns

   source_sink_ex :: IO ([Int], Int)
   source_sink_ex = do
       e <- reduce (flip send) (enumerator []) id $ num_src >< take 10
       (unused, total) <- pump (,) e sum_snk
       putStrLn $ "Total: " ++ (show total)
       putStrLn $ "Unused: " ++ (show unused)
       -- "Total: 15"
       -- "Unused: [6,7,8,9,10]"

Note that when a Pump and a Tube are combined with pump, that the Tube determines control flow. Pumps are comonads, not monads.

There are doubtless more and more interesting examples of combining Tubes and Pumps. If you think of any, drop the author a line!

data PumpF a b k Source




recvF :: (a, k)
sendF :: b -> k


mkPump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a Source

Creates a Pump for a Tube using a comonadic seed value, a function to give it more data upon request, and a function to handle any yielded results.

Values received from the Tube may be altered and sent back into the tube, hence this mechanism does act like something of a pump.

recv :: Comonad w => Pump a b w r -> (a, Pump a b w r) Source

Pull a value from a Pump, along with the rest of the Pump.

send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source

Send a value into a Pump, effectively re-seeding the stream.

pump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r Source

Given a suitably matching Tube and Pump, you can use the latter to execute the former.

pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r Source

A variant of pump which allows effects to be executed inside the pump as well.

meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x Source

Takes a fold function, an initial value, and an unfold to produce a metamorphism. Can be used to change.

enumerator :: [a] -> Pump (Maybe a) a Identity [a] Source

Constructs an enumerator pump, which can buffer values and then enumerate them to, say, a Sink (see the examples above).

enumerate :: (Monad m, Comonad w) => Pump (Maybe a) b w r -> Tube c a m () Source

Transforms a Pump into a corresponding Tube.