tubes-1.1.0.0: Write stream processing combinators in any monad and then compose them in a series of tubes.

Safe HaskellSafe
LanguageHaskell2010

Tubes.Pump

Synopsis

Documentation

type Pump a b = CofreeT (PumpF a b) Source

A Pump is the dual to a Tube. Intuitively, if a Tube is a stream- processing computation, then a Pump is both a stream generator and reducer.

Examples may help!

One interesting use of a Pump is as a data stream, which can be fed into a Tube or Sink.

   import Data.Functor.Identity

   e :: Pump (Maybe Int) Int Identity Int
   e = mkPump (Identity 0)
           (\(Identity x) -> (Just x, Identity (x+1)))
           const

   ex1 :: IO ()
   ex1 = do
       run $ each e >< take 10 >< map show >< display
       -- displays 0-9 in the console
   

A Pump may also be used to fold a Source. Indeed, a Pump may be thought of as both a non-recursive left fold and a non-recursive unfold paired together. (This is called a "metamorphism," hence the function "meta".)

   num_src :: Source Int IO ()
   num_src = do
       forM_ [1..] $ \n -> do
           lift . putStrLn $ "Yielding " ++ (show n)
           yield n

   enum_ex :: IO ()
   enum_ex = do
       v <- reduce (flip send) (meta (+) 0 (\x -> (x,x))) extract $ num_src >< take 5
       putStrLn . show $ "v = " ++ (show v)
       -- v = 15
   

The following is an example of a Pump both accumulating values from a Source and then enumerating them into a Sink. This gives back both the result of the computation and the unused input.

   import Data.Functor.Identity

   -- a Sink that stops after 5 loops, or when input is exhausted
   sum_snk :: Sink (Maybe Int) IO Int
   sum_snk = do
       ns <- forM [1,2,3,4,5] $ \_ -> do
           mn <- await
           case mn of
               Just n -> return [n]
               Nothing -> return []
       return $ sum . concat $ ns

   source_sink_ex :: IO ([Int], Int)
   source_sink_ex = do
       e <- reduce (flip send) (enumerator []) id $ num_src >< take 10
       (unused, total) <- pump (,) e sum_snk
       putStrLn $ "Total: " ++ (show total)
       putStrLn $ "Unused: " ++ (show unused)
       -- "Total: 15"
       -- "Unused: [6,7,8,9,10]"
   

Note that when a Pump and a Tube are combined with pump, that the Tube determines control flow. Pumps are comonads, not monads.

There are doubtless more and more interesting examples of combining Tubes and Pumps. If you think of any, drop the author a line!

data PumpF a b k Source

Constructors

PumpF 

Fields

recvF :: (a, k)
 
sendF :: b -> k
 

Instances

run :: Monad m => Tube (Maybe a) b m r -> m r Source

Runs a tube computation, producing a result value in the base monad.

Because of higher-rank polymorphism, tubes created using a Source and >< will work with this function as well.

Similarly, any tube created using |> and a Sink will work as well. This is an improvement over the behavior of runFreeT which gives back an unevaluated FreeT tree.

An example (using num_src and src_snk defined previously in this documentation):

       num_src :: Source Int IO ()
       num_src = do
           forM_ [1..] $ \n -> do
               lift . putStrLn $ "Yielding " ++ (show n)
               yield n

       sum_snk :: Sink (Maybe Int) IO Int
       sum_snk = do
           ns <- forM [1,2,3,4,5] $ \_ -> do
               mn <- await
               case mn of
                   Just n -> return [n]
                   Nothing -> return []
           return $ sum . concat $ ns

       >>> run $ num_src |> sum_snk
       15
   

15 is the return value from sum_snk. Both the source and the sink have the ability to terminate the computation by returning, perhaps when the source is exhausted or the sink is full.

mkPump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a Source

Creates a Pump for a Tube using a comonadic seed value, a function to give it more data upon request, and a function to handle any yielded results.

Values received from the Tube may be altered and sent back into the tube, hence this mechanism does act like something of a pump.

recv :: Comonad w => Pump a b w r -> (a, Pump a b w r) Source

Pull a value from a Pump, along with the rest of the Pump.

send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source

Send a value into a Pump, effectively re-seeding the stream.

pump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r Source

Given a suitably matching Tube and Pump, you can use the latter to execute the former.

pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r Source

A variant of pump which allows effects to be executed inside the pump as well.

meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x Source

Using the supplied functions, meta will fold and then unfold a stream, hence its name (which is short for metamorphism).

enumerator :: [a] -> Pump (Maybe a) a Identity [a] Source

Constructs an enumerator pump, which can buffer values and then enumerate them to, say, a Sink (see the examples above).

enumerate :: (Monad m, Comonad w) => Pump (Maybe a) b w r -> Tube c a m () Source

Transforms a Pump into a corresponding Tube.