streaming-utils-0.2.0.0: http, attoparsec, pipes and other utilities for the streaming libraries

Streaming.Pipes

Description

Pipes.Group.Tutorial is the correct introduction to the use of this module, which is mostly just an optimized Pipes.Group, replacing FreeT with Stream. The module also includes optimized functions for interoperation:

fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r
toStream :: Monad m => Producer a m r -> Stream (Of a) m r

. It is not a drop in replacement for Pipes.Group. The only systematic difference is that this simple module omits lenses. It is hoped that this will may make elementary usage easier to grasp. The lenses exported the pipes packages only come into their own with the simple StateT parsing procedure pipes promotes. We are not attempting here to replicate this advanced procedure, but only to make elementary forms of breaking and splitting possible in the simplest possible way. . The pipes-group tutorial is framed as a hunt for a genuinely streaming threeGroups, which would collect the first three groups of matching items while never holding more than the present item in memory. The formulation it opts for in the end would be expressed here thus:

import Pipes
import Streaming.Pipes
import qualified Pipes.Prelude as P

threeGroups :: (Monad m, Eq a) => Producer a m () -> Producer a m ()
threeGroups = concats . takes 3 . groups

The program splits the initial producer into a connected stream of producers containing "equal" values; it takes three of those; and then erases the effects of splitting. So for example

>>> runEffect $threeGroups (each "aabccoooooo") >-> P.print 'a' 'a' 'b' 'c' 'c'  The new user might look at the examples of splitting, breaking and joining in Streaming.Prelude keeping in mind that Producer a m r is equivalent to Stream (Of a) m r. . For the rest, only part of the tutorial that would need revision is the bit at the end about writing explicit FreeT programs. Here one does not proceed by pattern matching, but uses inspect in place of runFreeT inspect :: (Monad m, Functor f) => Stream f m r -> m (Either r (f (Stream f m r))) and for construction of a Stream (Producer a m) m r, the usual battery of combinators: wrap :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r effect :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r yields :: (Monad m, Functor f) => f r -> Stream f m r lift :: (Monad m, Functor f) => m r -> Stream f m r  and so on. Synopsis # Streaming / Pipes interoperation fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r Source # Construct an ordinary pipes Producer from a Stream of elements >>> runEffect$ fromStream (S.each  [1..3]) >-> P.print
1
2
3


toStream :: Monad m => Producer a m r -> Stream (Of a) m r Source #

Construct a Stream of elements from a pipes Producer

>>> S.print $toStream$ P.each [1..3]
1
2
3


Link the chunks of a producer of bytestrings into a single byte stream

Successively yield the chunks hidden in a byte stream.

# Splitting a Producer into a connected stream of Producers

chunksOf :: Monad m => Int -> Producer a m r -> Stream (Producer a m) m r Source #

chunksOf splits a Producer into a Stream of Producers of a given length. Its inverse is concats.

>>> let listN n = L.purely P.folds L.list . P.chunksOf n
>>> runEffect $listN 3 P.stdinLn >-> P.take 2 >-> P.map unwords >-> P.print 1<Enter> 2<Enter> 3<Enter> "1 2 3" 4<Enter> 5<Enter> 6<Enter> "4 5 6" >>> let stylish = P.concats . P.maps (<* P.yield "-*-") . P.chunksOf 2 >>> runEffect$ stylish (P.each \$ words "one two three four five six") >-> P.stdoutLn
one
two
-*-
three
four
-*-
five
six
-*-


groups :: (Monad m, Eq a) => Producer a m r -> Stream (Producer a m) m r Source #

groupsBy :: Monad m => (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r Source #

groupsBy splits a Producer into a Stream of Producers of equal items. Its inverse is concats

groupsBy' :: Monad m => (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r Source #

groupsBy' splits a Producer into a Stream of Producers grouped using the given relation. Its inverse is concats

This differs from groupsBy by comparing successive elements instead of comparing each element to the first member of the group

>>> import Pipes (yield, each)
>>> import Pipes.Prelude (toList)
>>> let rel c1 c2 = succ c1 == c2
>>> (toList . intercalates (yield '|') . groupsBy' rel) (each "12233345")
"12|23|3|345"
>>> (toList . intercalates (yield '|') . groupsBy  rel) (each "12233345")
"122|3|3|34|5"


split :: (Eq a, Monad m) => a -> Producer a m r -> Stream (Producer a m) m r Source #

breaks :: (Eq a, Monad m) => (a -> Bool) -> Producer a m r -> Stream (Producer a m) m r Source #

# Rejoining a connected stream of Producers

concats :: Monad m => Stream (Producer a m) m r -> Producer a m r Source #

Join a stream of Producers into a single Producer

intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m x -> Stream (t m) m r -> t m r #

Interpolate a layer at each segment. This specializes to e.g.

intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r

# Folding over the separate layers of a connected stream of Producers

Arguments

 :: Monad m => (x -> a -> x) Step function -> x Initial accumulator -> (x -> b) Extraction function -> Stream (Producer a m) m r -> Producer b m r

Fold each Producer in a producer Stream

purely folds
:: Monad m => Fold a b -> Stream (Producer a m) m r -> Producer b m r

Arguments

 :: Monad m => (x -> a -> m x) Step function -> m x Initial accumulator -> (x -> m b) Extraction function -> Stream (Producer a m) m r -> Producer b m r

Fold each Producer in a Producer stream, monadically

impurely foldsM
:: Monad m => FoldM a b -> Stream (Producer a m) m r -> Producer b m r

# Transforming a connected stream of Producers

takes :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m () #

takes' :: Monad m => Int -> Stream (Producer a m) m r -> Stream (Producer a m) m r Source #

(takes' n) only keeps the first n Producers of a linked Stream of Producers

Unlike takes, takes' is not functor-general - it is aware that a Producer can be drained, as functors cannot generally be. Here, then, we drain the unused Producers in order to preserve the return value. This makes it a suitable argument for maps.

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r #

Map layers of one functor to another with a transformation. Compare hoist, which has a similar effect on the monadic parameter.

maps id = id
maps f . maps g = maps (f . g)

# Streaming division of a Producer into two

span :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) Source #

span splits a Producer into two Producers; the outer Producer is the longest consecutive group of elements that satisfy the predicate. Its inverse is join

break :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) Source #

break splits a Producer into two Producers; the outer Producer is the longest consecutive group of elements that fail the predicate. Its inverse is join

splitAt :: Monad m => Int -> Producer a m r -> Producer a m (Producer a m r) Source #

splitAt divides a Producer into two Producers after a fixed number of elements. Its inverse is join

group :: (Monad m, Eq a) => Producer a m r -> Producer a m (Producer a m r) Source #

Like groupBy, where the equality predicate is (==)

groupBy :: Monad m => (a -> a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) Source #

groupBy splits a Producer into two Producers; the second producer begins where we meet an element that is different according to the equality predicate. Its inverse is join