úΉ9….5      !"#$%&'()*+,-./01234Safe-34579INThe basis for the R comonad transformer. This says that a pump computation can send and receive data.Pumps are the dual of  s. Where a  may either be  ing or  ing, a  is always in a position to  or ' data. They are the machines which run s, essentially.:Pumps may be used to formulate infinite streams and folds.TODO: more examples!0Note the type arguments are "backward" from the  point of view: a  Pump b a w r may be sent values of type a and you may receive b values from it.The central data type. Tube(s stacked on top of the same base monad mC may be composed in series, so long as their type arguments agree. TubeF defines the language of  s - namely,   and  .WThis type is merely the CPS-encoded version of the following much friendlier data type: b data TubeF a b k = Await (a -> k) | Yield (b , k) deriving (Functor) îThis says: a tube computation is either paused awaiting upstream data, or paused yielding data downstream. The free monad transformer fleshes out the other cases, namely having finished with a final result value or wrapping a lower monad.  Value constructor for the first  case. !Value constructor for the second  case. Command telling a . computation to pause and await upstream data. Command telling a 0 computation to yield data downstream and pause.Command telling a  with base type () to simply stop. Compose a  emitting values of type b4 with a continuation producing a suitable successor. Used primarily to define '(><)'.4Compose compatible tubes in series to produce a new .  each [1..10] > map(*2)< pour display Send a  a value, yielding a new .Receive a value from a , along with a new  for the future. Construct a  based on an arbitrary comonad. Process a  stream with a given , and merge their results. Process a  stream with an effectful , and merge their results.Run a self-contained  computation.\Used only in situations where a dummy value is needed. Actively working to get rid of this.!56 789:;<=   56 789:;<=SafeIN  Loops over a  and gives each  ed value to the continuation.FA default tube to end a series when no further processing is required.+Continuously relays any values it receives.:Transforms all incoming values according to some function.Refuses to yield the first n values it receives. -Yields only values satisfying some predicate.!DTerminates the stream upon receiving a value violating the predicate"Relay only the first n elements of a stream.#)Taps the next value from a source, maybe.$ Similar to #- but it first sends a value through the tube.% Similar to 9 except it maps a monadic function instead of a pure one.&7Evaluates and extracts a pure value from a monadic one.'0Constructs a resumable left fold. Example usage: ÿ[ summer :: Pump () Int Identity Int summer = lfold (+) (x -> ((),x)) 0 main :: IO () main = do result <- stream const (duplicate summer) $ each [1..10] putStrLn . show . extract $ result -- "55" result2 <- stream const (duplicate result) $ each [11..20] putStrLn . show . extract $ result2 -- "210"  !"#$%&' !"#$%&'"! #$%&' !"#$%&'Safe7IN(JAn exhaustible source of values parameterized over a base monad. It never   s, it only  s.({s are monad transformers in their own right, as they are possibly finite. They may also be synchronously merged as monoids: ÿÁ import Data.Monoid src1 :: Source IO String src1 = Source $ each ["line A1", "line A2", "line A3"] src2 :: Source IO String src2 = Source $ each ["line B1", "line B2", "line B3", "line B4"] src3 :: Source IO String src3 = src1 <> src2 main :: IO () main = runTube $ sample (src1 <> src2) >< pour display -- line A1 -- line B1 -- line A2 -- line B2 -- line A3 -- line B3 -- line B4 AIf one source runs out, the other will continue until completion.+Strict left-fold of a ( , using a  internally. ()*+>?@ABCDEF()*+()*+ ()*+>?@ABCDEFSafeIN,LA potentially full sink of values parameterized over a base monad. It never  s.A ,¢ is a contravariant functor. Intuitively this means that it is a consumer of some base type, and you may map transformations over its input before it is consumed.Example: ÿ0 import Data.Functor.Contravariant add5 :: Sink IO Int add5 = Sink $ loop 0 5 where loop acc 0 = do liftIO $ putStrLn $ "Sum of five numbers: " ++ (show acc) halt loop acc count = do n <- await loop (acc + n) (count - 1) times2Add5:: Sink IO Int times2Add5 = (*2) >$< add5 main :: IO () main = do runTube $ each [1..10] >< pour add5 -- "Sum of five numbers: 15" runTube $ each [1..10] >< pour times2Add5 -- "Sum of five numbers: 30" ,8s may also be merged together, as they form a semigroup: ÿÍ import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Sink IO String writeOut = writeToFile <> writeToConsole main :: IO () main = do runTube $ each [1..3] >< map show >< forever (pour writeOut) -- Totally writing this to a file: 1 -- Console out: 1 -- Totally writing this to a file: 2 -- Console out: 2 -- Totally writing this to a file: 3 -- Console out: 3 ,-.GHIJ,-.,-.,-.GHIJSafe/A  Channel m a b5 is a stream processor which converts values of type a into values of type b3, while also performing side-effects in some monad m.If a Channel  "s exactly once after each time it  &s then it may be safely treated as an Arrow. For example: ÿÏ {-# LANGUAGE Arrows #-} import Tubes import Control.Arrow import Prelude hiding (map) -- A simple channel which accumulates a total total :: (Num a, Monad m) => Channel m a a total = Channel $ loop 0 where loop acc = do n <- await let acc' = n + acc yield acc' loop acc' -- A running average using two totals in parallel avg :: (Fractional a, Monad m) => Channel m a a avg = proc value -> do t <- total -< value n <- total -< 1 returnA -< t / n main :: IO () main = runTube $ each [0,10,7,8] >< tune avg >< map show >< pour display This program would output / 0.0 5.0 5.666666666666667 6.25 3This has interesting potential in FRP applications.2MConvert a 'Sink m a' into a 'Channel m a a', re-forwarding values downstream.Useful example: ÿé import Data.Semigroup writeToFile :: Sink IO String writeToFile = Sink $ do line <- await liftIO . putStrLn $ "Totally writing this to a file: " ++ line writeToConsole :: Sink IO String writeToConsole = Sink $ do line <- await liftIO . putStrLn $ "Console out: " ++ line writeOut :: Channel IO String String writeOut = tee $ writeToFile <> writeToConsole main :: IO () main = runTube $ each ["a","b","c"] >< forever (tune writeOut) >< pour display -- Totally writing this to a file: a -- Console out: a -- a -- Totally writing this to a file: b -- Console out: b -- b -- Totally writing this to a file: c -- Console out: c -- c 0This takes advantage of the divisible nature of ,@s to merge effectful computations and then continue the process./012KLM/012/012/012KLMAll-encompassing module.1(c) 2014, 2016 Gatlin Johnson <gatlin@niltag.net>GPL-3gatlin@niltag.net experimentalSafeIN3 Source of NCs from stdin. This is mostly for debugging / ghci example purposes.4 Sink for NCs to stdout. This is mostly for debugging / ghci example purposes.34*  !"#$%&'()*+,-./01234* ()*+,-./012'"! #$%&3434O       !"#$%&'()*+,-.//012234456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUtubes_4h1EQGycBle9uGxB5rqx3y Tubes.Core Tubes.Util Tubes.Source Tubes.Sink Tubes.ChannelTubesfree_JNs4sGInPGiKkMdCAfezEnControl.Monad.Trans.FreerunFreeTPumpFsendFrecvFPumpTubeTubeFrunTubeFawaitFyieldFawaityieldhaltliftT>-><sendrecvpumpTstreamstreamMrunTubedivergeforstopcateacheverymapdropfilter takeWhiletakeunyieldpassmapMsequencelfoldSourcesamplereduceSinkpourChanneltuneteepromptdisplayAdjointadj_stream_streamMfix$fAdjointPumpFTubeF$fAdjoint(,)(->)$fAdjoint(->)(,)$fAdjointIdentityIdentity$fSemigroupSource$fMonoidSource$fMonadPlusSource$fMonadIOSource$fMonadTransSource$fAlternativeSource $fMonadSource$fApplicativeSource$fFunctorSource$fSemigroupSink$fDecidableSink$fDivisibleSink$fContravariantSink$fArrowChannel$fCategory*Channel$fProfunctorChannelbaseGHC.BaseString