streamly-core-0.1.0: Streaming, parsers, arrays and more
Copyright(c) 2018 Composewell Technologies
(c) Roman Leshchinskiy 2008-2010
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.StreamD.Nesting

Description

This module contains transformations involving multiple streams, unfolds or folds. There are two types of transformations generational or eliminational. Generational transformations are like the Generate module but they generate a stream by combining streams instead of elements. Eliminational transformations are like the Eliminate module but they transform a stream by eliminating parts of the stream instead of eliminating the whole stream.

These combinators involve transformation, generation, elimination so can be classified under any of those.

Ultimately these operations should be supported by Unfolds, Pipes and Folds, and this module may become redundant.

Synopsis

Generate

Combining streams to generate streams.

Combine Two Streams

Functions ending in the shape:

t m a -> t m a -> t m a.

Appending

Append a stream after another. A special case of concatMap or unfoldMany.

data AppendState s1 s2 Source #

Constructors

AppendFirst s1 
AppendSecond s2 

append :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Fuses two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> s1 = Stream.fromList [1,2]
>>> s2 = Stream.fromList [3,4]
>>> Stream.fold Fold.toList $ s1 `Stream.append` s2
[1,2,3,4]

This function should not be used to dynamically construct a stream. If a stream is constructed by successive use of this function it would take quadratic time complexity to consume the stream.

This function should only be used to statically fuse a stream with another stream. Do not use this recursively or where it cannot be inlined.

See Streamly.Data.StreamK for an append that can be used to construct a stream recursively.

Interleaving

Interleave elements from two streams alternately. A special case of unfoldInterleave.

interleave :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

When joining many streams in a left associative manner earlier streams will get exponential priority than the ones joining later. Because of exponential weighting it can be used with concatMapWith even on a large number of streams.

interleaveMin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Like interleave but stops interleaving as soon as any of the two streams stops.

interleaveFst :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveFst "abc" ",,,," :: Stream Identity Char
fromList "a,b,c"
>>> Stream.interleaveFst "abc" "," :: Stream Identity Char
fromList "a,bc"

interleaveFst is a dual of interleaveFstSuffix.

Do not use dynamically.

Pre-release

interleaveFstSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveFstSuffix "abc" ",,,," :: Stream Identity Char
fromList "a,b,c,"
>>> Stream.interleaveFstSuffix "abc" "," :: Stream Identity Char
fromList "a,bc"

interleaveFstSuffix is a dual of interleaveFst.

Do not use dynamically.

Pre-release

Scheduling

Execute streams alternately irrespective of whether they generate elements or not. Note interleave would execute a stream until it yields an element. A special case of unfoldRoundRobin.

roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

Schedule the execution of two streams in a fair round-robin manner, executing each stream once, alternately. Execution of a stream may not necessarily result in an output, a stream may choose to Skip producing an element until later giving the other stream a chance to run. Therefore, this combinator fairly interleaves the execution of two streams rather than fairly interleaving the output of the two streams. This can be useful in co-operative multitasking without using explicit threads. This can be used as an alternative to async.

Do not use dynamically.

Pre-release

Zipping

Zip corresponding elements of two streams.

zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

Stream a is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer.

If stream a or stream b ends, the zipped stream ends. If stream b ends first, the element a from previous evaluation of stream a is discarded.

>>> s1 = Stream.fromList [1,2,3]
>>> s2 = Stream.fromList [4,5,6]
>>> Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2
[5,7,9]

zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Like zipWith but using a monadic zipping function.

Merging

Interleave elements from two streams based on a condition.

mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> s1 = Stream.fromList [1,3,5]
>>> s2 = Stream.fromList [2,4,6,8]
>>> Stream.fold Fold.toList $ Stream.mergeBy compare s1 s2
[1,2,3,4,5,6,8]

mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeBy but with a monadic comparison function.

Merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT
> Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]

Merge two streams in a proportion of 2:1:

>>> :{
do
 let s1 = Stream.fromList [1,1,1,1,1,1]
     s2 = Stream.fromList [2,2,2]
 let proportionately m n = do
      ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
      return $ \_ _ -> do
         r <- readIORef ref
         writeIORef ref $ Prelude.tail r
         return $ Prelude.head r
 f <- proportionately 2 1
 xs <- Stream.fold Fold.toList $ Stream.mergeByM f s1 s2
 print xs
:}
[1,1,2,1,1,2,1,1,2]

mergeMinBy :: (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeByM but stops merging as soon as any of the two streams stops.

Unimplemented

mergeFstBy :: (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeByM but stops merging as soon as the first stream stops.

Unimplemented

Combine N Streams

Functions generally ending in these shapes:

concat: f (t m a) -> t m a
concatMap: (a -> t m b) -> t m a -> t m b
unfoldMany: Unfold m a b -> t m a -> t m b

ConcatMap

Generate streams by mapping a stream generator on each element of an input stream, append the resulting streams and flatten.

concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

>>> concatMap f = Stream.concatMapM (return . f)
>>> concatMap f = Stream.concat . fmap f
>>> concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)

See unfoldMany for a fusible alternative.

concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

See unfoldMany for a fusible alternative.

ConcatUnfold

Generate streams by using an unfold on each element of an input stream, append the resulting streams and flatten. A special case of gintercalate.

unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

unfoldMany unfold stream uses unfold to map the input stream elements to streams and then flattens the generated streams into a single output stream.

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

unfoldInterleave :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

This does not pair streams like mergeMapWith, instead, it goes through each stream one by one and yields one element from each stream. After it goes to the last stream it reverses the traversal to come back to the first stream yielding elements from each stream on its way back to the first stream and so on.

>>> lists = Stream.fromList [[1,1],[2,2],[3,3],[4,4],[5,5]]
>>> interleaved = Stream.unfoldInterleave Unfold.fromList lists
>>> Stream.fold Fold.toList interleaved
[1,2,3,4,5,5,4,3,2,1]

Note that this is order of magnitude more efficient than "mergeMapWith interleave" because of fusion.

unfoldRoundRobin :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

unfoldInterleave switches to the next stream whenever a value from a stream is yielded, it does not switch on a Skip. So if a stream keeps skipping for long time other streams won't get a chance to run. unfoldRoundRobin switches on Skip as well. So it basically schedules each stream fairly irrespective of whether it produces a value or not.

Interpose

Like unfoldMany but intersperses an effect between the streams. A special case of gintercalate.

interpose :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

>>> unwords = Stream.interpose ' '

Pre-release

interposeM :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

interposeSuffix :: Monad m => c -> Unfold m b c -> Stream m b -> Stream m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

>>> unlines = Stream.interposeSuffix '\n'

Pre-release

interposeSuffixM :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

Intercalate

Like unfoldMany but intersperses streams from another source between the streams from the first source.

gintercalate :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

interleaveFst followed by unfold and concat.

Pre-release

gintercalateSuffix :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

interleaveFstSuffix followed by unfold and concat.

Pre-release

intercalate :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

intersperse followed by unfold and concat.

>>> intercalate u a = Stream.unfoldMany u . Stream.intersperse a
>>> intersperse = Stream.intercalate Unfold.identity
>>> unwords = Stream.intercalate Unfold.fromList " "
>>> input = Stream.fromList ["abc", "def", "ghi"]
>>> Stream.fold Fold.toList $ Stream.intercalate Unfold.fromList " " input
"abc def ghi"

intercalateSuffix :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

intersperseMSuffix followed by unfold and concat.

>>> intercalateSuffix u a = Stream.unfoldMany u . Stream.intersperseMSuffix a
>>> intersperseMSuffix = Stream.intercalateSuffix Unfold.identity
>>> unlines = Stream.intercalateSuffix Unfold.fromList "\n"
>>> input = Stream.fromList ["abc", "def", "ghi"]
>>> Stream.fold Fold.toList $ Stream.intercalateSuffix Unfold.fromList "\n" input
"abc\ndef\nghi\n"

Eliminate

Folding and Parsing chunks of streams to eliminate nested streams. Functions generally ending in these shapes:

f (Fold m a b) -> t m a -> t m b
f (Parser a m b) -> t m a -> t m b

Folding

Apply folds on a stream.

foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Apply a Fold repeatedly on a stream and emit the results in the output stream.

Definition:

>>> foldMany f = Stream.parseMany (Parser.fromFold f)

Example, empty stream:

>>> f = Fold.take 2 Fold.sum
>>> fmany = Stream.fold Fold.toList . Stream.foldMany f
>>> fmany $ Stream.fromList []
[]

Example, last fold empty:

>>> fmany $ Stream.fromList [1..4]
[3,7]

Example, last fold non-empty:

>>> fmany $ Stream.fromList [1..5]
[3,7,5]

Note that using a closed fold e.g. Fold.take 0, would result in an infinite stream on a non-empty input stream.

refoldMany :: Monad m => Refold m x a b -> m x -> Stream m a -> Stream m b Source #

Like foldMany but for the Refold type. The supplied action is used as the initial value for each refold.

Internal

foldSequence :: Stream m (Fold m a b) -> Stream m a -> Stream m b Source #

Apply a stream of folds to an input stream and emit the results in the output stream.

Unimplemented

foldIterateM :: Monad m => (b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b Source #

Iterate a fold generator on a stream. The initial value b is used to generate the first fold, the fold is applied on the stream and the result of the fold is used to generate the next fold and so on.

>>> import Data.Monoid (Sum(..))
>>> f x = return (Fold.take 2 (Fold.sconcat x))
>>> s = fmap Sum $ Stream.fromList [1..10]
>>> Stream.fold Fold.toList $ fmap getSum $ Stream.foldIterateM f (pure 0) s
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold.

Pre-release

refoldIterateM :: Monad m => Refold m b a b -> m b -> Stream m a -> Stream m b Source #

Like foldIterateM but using the Refold type instead. This could be much more efficient due to stream fusion.

Internal

Parsing

Parsing is opposite to flattening. parseMany is dual to concatMap or unfoldMany. concatMap generates a stream from single values in a stream and flattens, parseMany does the opposite of flattening by splitting the stream and then folds each such split to single value in the output stream.

parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #

Apply a Parser repeatedly on a stream and emit the parsed values in the output stream.

Example:

>>> s = Stream.fromList [1..10]
>>> parser = Parser.takeBetween 0 2 Fold.sum
>>> Stream.fold Fold.toList $ Stream.parseMany parser s
[Right 3,Right 7,Right 11,Right 15,Right 19]

This is the streaming equivalent of the many parse combinator.

Known Issues: When the parser fails there is no way to get the remaining stream.

parseManyD :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #

parseSequence :: Stream m (Parser a m b) -> Stream m a -> Stream m b Source #

Apply a stream of parsers to an input stream and emit the results in the output stream.

Unimplemented

parseManyTill :: Parser a m b -> Parser a m x -> Stream m a -> Stream m b Source #

parseManyTill collect test stream tries the parser test on the input, if test fails it backtracks and tries collect, after collect succeeds test is tried again and so on. The parser stops when test succeeds. The output of test is discarded and the output of collect is emitted in the output stream. The parser fails if collect fails.

Unimplemented

parseIterate :: Monad m => (b -> Parser a m b) -> b -> Stream m a -> Stream m (Either ParseError b) Source #

Iterate a parser generating function on a stream. The initial value b is used to generate the first parser, the parser is applied on the stream and the result is used to generate the next parser and so on.

>>> import Data.Monoid (Sum(..))
>>> s = Stream.fromList [1..10]
>>> Stream.fold Fold.toList $ fmap getSum $ Stream.catRights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum s
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser.

Pre-release

parseIterateD :: Monad m => (b -> Parser a m b) -> b -> Stream m a -> Stream m (Either ParseError b) Source #

Grouping

Group segments of a stream and fold. Special case of parsing.

groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b Source #

groupsBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

groupsRollingBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Splitting

A special case of parsing.

wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

splitOnSeq :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #

splitOnSuffixSeq :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b Source #

sliceOnSuffix :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int) Source #

splitOnSuffixSeqAny :: [Array a] -> Fold m a b -> Stream m a -> Stream m b Source #

Split post any one of the given patterns.

Unimplemented

splitOnPrefix :: (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split on a prefixed separator element, dropping the separator. The supplied Fold is applied on the split segments.

> splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
> splitOnPrefix' (== .) ".a.b"
["a","b"]

An empty stream results in an empty output stream: > splitOnPrefix' (== .) "" []

An empty segment consisting of only a prefix is folded to the default output of the fold:

> splitOnPrefix' (== .) "."
[""]

> splitOnPrefix' (== .) ".a.b."
["a","b",""]

> splitOnPrefix' (== .) ".a..b"
["a","","b"]

A prefix is optional at the beginning of the stream:

> splitOnPrefix' (== .) "a"
["a"]

> splitOnPrefix' (== .) "a.b"
["a","b"]

splitOnPrefix is an inverse of intercalatePrefix with a single element:

Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id

Unimplemented

splitOnAny :: [Array a] -> Fold m a b -> Stream m a -> Stream m b Source #

Split on any one of the given patterns.

Unimplemented

Transform (Nested Containers)

Opposite to compact in ArrayStream

splitInnerBy :: Monad m => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

splitInnerBySuffix :: (Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

intersectBySorted :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Reduce By Streams

dropPrefix :: Stream m a -> Stream m a -> Stream m a Source #

Drop prefix from the input stream if present.

Space: O(1)

Unimplemented

dropInfix :: Stream m a -> Stream m a -> Stream m a Source #

Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.

Space: O(n) where n is the length of the infix.

Unimplemented

dropSuffix :: Stream m a -> Stream m a -> Stream m a Source #

Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.

Space: O(n) where n is the length of the suffix.

Unimplemented