Copyright | (c) 2017 Composewell Technologies |
---|---|

License | BSD-3-Clause |

Maintainer | streamly@composewell.com |

Stability | experimental |

Portability | GHC |

Safe Haskell | Safe-Inferred |

Language | Haskell2010 |

Deprecated: Please use "Streamly.Data.Stream.*" instead.

Expand a stream by combining two or more streams or by combining streams with unfolds.

## Synopsis

- serial :: IsStream t => t m a -> t m a -> t m a
- ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- append :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- wSerial :: IsStream t => t m a -> t m a -> t m a
- wSerialFst :: WSerialT m a -> WSerialT m a -> WSerialT m a
- wSerialMin :: WSerialT m a -> WSerialT m a -> WSerialT m a
- interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- merge :: (IsStream t, Ord a) => t m a -> t m a -> t m a
- mergeBy :: IsStream t => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- mergeByMFused :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- mergeMinBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- mergeFstBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- unfoldMany :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- unfoldManyInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- unfoldManyRoundRobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c
- intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c
- gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
- concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
- concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
- concat :: (IsStream t, Monad m) => t m (t m a) -> t m a
- concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
- concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b
- concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- iterateMapWith :: IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
- iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a
- iterateMapLeftsWith :: (IsStream t, b ~ Either a c) => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m b -> t m b
- iterateUnfold :: Unfold m a a -> t m a -> t m a
- concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b

# Binary Combinators (Linear)

Functions ending in the shape:

`t m a -> t m a -> t m a`

.

The functions in this section have a linear or flat n-ary combining
characterstics. It means that when combined `n`

times (e.g. `a `

) the resulting expression will have an `serial`

b `serial`

c ...`O(n)`

complexity (instead O(n^2) for pair wise combinators described in the
next section. These functions can be used efficiently with
`concatMapWith`

et. al. combinators that combine streams in a linear
fashion (contrast with `concatPairsWith`

which combines streams as a
binary tree).

serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

`>>>`

`import Streamly.Prelude (serial)`

`>>>`

`stream1 = Stream.fromList [1,2]`

`>>>`

`stream2 = Stream.fromList [3,4]`

`>>>`

[1,2,3,4]`Stream.toList $ stream1 `serial` stream2`

This operation can be used to fold an infinite lazy container of streams.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

`>>>`

`import Streamly.Prelude (ahead, SerialT)`

`>>>`

`stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int`

`>>>`

`stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int`

`>>>`

2 sec 4 sec [4,2]`Stream.toList $ stream1 `ahead` stream2 :: IO [Int]`

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

`>>>`

`stream3 = Stream.fromEffect (delay 1)`

`>>>`

1 sec 2 sec 4 sec [4,2,1]`Stream.toList $ stream1 `ahead` stream2 `ahead` stream3`

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

`>>>`

2 sec 1 sec 4 sec [4,2,1]`Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3`

Only streams are scheduled for ahead evaluation, how actions within a stream
are evaluated depends on the stream type. If it is a concurrent stream they
will be evaluated concurrently. It may not make much sense combining serial
streams using `ahead`

.

`ahead`

can be safely used to fold an infinite lazy container of streams.

*Since: 0.3.0 (Streamly)*

*Since: 0.8.0*

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

`>>>`

`import Streamly.Prelude (async)`

`>>>`

`stream1 = Stream.fromEffect (delay 4)`

`>>>`

`stream2 = Stream.fromEffect (delay 2)`

`>>>`

2 sec 4 sec [2,4]`Stream.toList $ stream1 `async` stream2`

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

`>>>`

`stream3 = Stream.fromEffect (delay 1)`

`>>>`

... [1,2,4]`Stream.toList $ stream1 `async` stream2 `async` stream3`

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

`>>>`

... [2,1,4]`Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3`

With a single thread, it becomes serial:

`>>>`

... [4,2,1]`Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3`

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

`>>>`

`stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int`

`>>>`

`stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int`

`>>>`

... [1,1,3,3]`Stream.toList $ stream1 `async` stream2 -- IO [Int]`

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]

... [1,1,3,2,3,2]

Thus `async`

goes deep in first few streams rather than going wide in all
streams. It prefers to evaluate the leftmost streams as much as possible.
Because of this behavior, `async`

can be safely used to fold an infinite
lazy container of streams.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, `wAsync`

is the same as `async`

. See `async`

for
singleton stream behavior. For multi-element streams, while `async`

is left
biased i.e. it tries to evaluate the left side stream as much as possible,
`wAsync`

tries to schedule them both fairly. In other words, `async`

goes
deep while `wAsync`

goes wide. However, outputs are always used as they
arrive.

With a single thread, `async`

starts behaving like `serial`

while `wAsync`

starts behaving like `wSerial`

.

`>>>`

`import Streamly.Prelude (async, wAsync)`

`>>>`

`stream1 = Stream.fromList [1,2,3]`

`>>>`

`stream2 = Stream.fromList [4,5,6]`

`>>>`

[1,2,3,4,5,6]`Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2`

`>>>`

[1,4,2,5,3,6]`Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2`

With two threads available, and combining three streams:

`>>>`

`stream3 = Stream.fromList [7,8,9]`

`>>>`

[1,2,3,4,5,6,7,8,9]`Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3`

`>>>`

[1,4,2,7,5,3,8,6,9]`Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3`

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that `WSerialT`

and single threaded `WAsyncT`

both interleave streams
but the exact scheduling is slightly different in both cases.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Like `async`

except that the execution is much more
strict. There is no limit on the number of threads. While
`async`

may not schedule a stream if there is no demand
from the consumer, `parallel`

always evaluates both the streams immediately.
The only limit that applies to `parallel`

is `maxBuffer`

.
Evaluation may block if the output buffer becomes full.

`>>>`

`import Streamly.Prelude (parallel)`

`>>>`

`stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)`

`>>>`

1 sec 2 sec [1,2]`Stream.toList stream -- IO [Int]`

`parallel`

guarantees that all the streams are scheduled for execution
immediately, therefore, we could use things like starting timers inside the
streams and relying on the fact that all timers were started at the same
time.

Unlike `async`

this operation cannot be used to fold an infinite lazy
container of streams, because it schedules all the streams strictly
concurrently.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel`

but stops the output as soon as the first stream stops.

*Pre-release*

parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel`

but stops the output as soon as any of the two streams
stops.

*Pre-release*

# Binary Combinators (Pair Wise)

Like the functions in the section above these functions also combine
two streams into a single stream but when used `n`

times linearly they
exhibit O(n^2) complexity. They are best combined in a binary tree
fashion using `concatPairsWith`

giving a `n * log n`

complexity. Avoid
using these with `concatMapWith`

when combining a large or infinite
number of streams.

## Append

append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.

IMPORTANT NOTE: This could be 100x faster than `serial/<>`

for appending a
few (say 100) streams because it can fuse via stream fusion. However, it
does not scale for a large number of streams (say 1000s) and becomes
qudartically slow. Therefore use this for custom appending of a few streams
but use `concatMap`

or 'concatMapWith serial' for appending `n`

streams or
infinite containers of streams.

*Pre-release*

## wSerial

`wSerial`

is a CPS based stream interleaving functions. Use
'concatPairsWith wSerial' to interleave `n`

streams uniformly. It can be
used with `concatMapWith`

as well, however, the interleaving behavior of
`n`

streams would be asymmetric giving exponentially more weightage to
streams that come earlier in the composition.

wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

`>>>`

`import Streamly.Prelude (wSerial)`

`>>>`

`stream1 = Stream.fromList [1,2]`

`>>>`

`stream2 = Stream.fromList [3,4]`

`>>>`

[1,3,2,4]`Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2`

Note, for singleton streams `wSerial`

and `serial`

are identical.

Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

## Interleave

`interleave`

is like `wSerial`

but using a direct style
implementation instead of CPS. It is faster than `wSerial`

due to stream
fusion but has worse efficiency when used with `concatMapWith`

for large
number of streams.

interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,,,"`Stream.interleave "ab" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,b,cd"`Stream.interleave "abcd" ",," :: Stream.SerialT Identity Char`

`interleave`

is dual to `interleaveMin`

, it can be called `interleaveMax`

.

Do not use at scale in concatMapWith.

*Pre-release*

interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,"`Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,b,c"`Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity Char`

`interleaveMin`

is dual to `interleave`

.

Do not use at scale in concatMapWith.

*Pre-release*

interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,c,"`Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,bc"`Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity Char`

`interleaveSuffix`

is a dual of `interleaveInfix`

.

Do not use at scale in concatMapWith.

*Pre-release*

interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

`>>>`

`:set -XOverloadedStrings`

`>>>`

`import Data.Functor.Identity (Identity)`

`>>>`

fromList "a,b,c"`Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity Char`

`>>>`

fromList "a,bc"`Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity Char`

`interleaveInfix`

is a dual of `interleaveSuffix`

.

Do not use at scale in concatMapWith.

*Pre-release*

## Round Robin

roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Schedule the execution of two streams in a fair round-robin manner,
executing each stream once, alternately. Execution of a stream may not
necessarily result in an output, a stream may chose to `Skip`

producing an
element until later giving the other stream a chance to run. Therefore, this
combinator fairly interleaves the execution of two streams rather than
fairly interleaving the output of the two streams. This can be useful in
co-operative multitasking without using explicit threads. This can be used
as an alternative to `async`

.

Do not use at scale in concatMapWith.

*Pre-release*

## Zip

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream `a`

is evaluated first, followed by stream `b`

, the resulting
elements `a`

and `b`

are then zipped using the supplied zip function and the
result `c`

is yielded to the consumer.

If stream `a`

or stream `b`

ends, the zipped stream ends. If stream `b`

ends
first, the element `a`

from previous evaluation of stream `a`

is discarded.

> D.toList $ D.zipWith (+) (D.fromList [1,2,3]) (D.fromList [4,5,6]) [5,7,9]

*Since: 0.1.0*

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipWith`

but using a monadic zipping function.

*Since: 0.4.0*

zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Like `zipWith`

but zips concurrently i.e. both the streams being zipped
are evaluated concurrently using the `ParallelT`

concurrent evaluation
style. The maximum number of elements of each stream evaluated in advance
can be controlled by `maxBuffer`

.

The stream ends if stream `a`

or stream `b`

ends. However, if stream `b`

ends while we are still evaluating stream `a`

and waiting for a result then
stream will not end until after the evaluation of stream `a`

finishes. This
behavior can potentially be changed in future to end the stream immediately
as soon as any of the stream end is detected.

*Since: 0.1.0*

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipAsyncWith`

but with a monadic zipping function.

*Since: 0.4.0*

## Merge

mergeBy :: IsStream t => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8]) [1,2,3,4,5,6,8]

See also: `mergeByMFused`

*Since: 0.6.0*

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeBy`

but with a monadic comparison function.

Merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]

Merge two streams in a proportion of 2:1:

>>> :{ do let proportionately m n = do ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT] return $ _ _ -> do r <- readIORef ref writeIORef ref $ Prelude.tail r return $ Prelude.head r f <- proportionately 2 1 xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2]) print xs :} [1,1,2,1,1,2,1,1,2]

See also: `mergeByMFused`

*Since: 0.6.0*

mergeByMFused :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but much faster, works best when merging statically known
number of streams. When merging more than two streams try to merge pairs and
pair pf pairs in a tree like structure.`mergeByM`

works better with variable
number of streams being merged using `concatPairsWith`

.

*Internal*

mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeBy`

but merges concurrently (i.e. both the elements being
merged are generated concurrently).

*Since: 0.6.0*

mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but merges concurrently (i.e. both the elements being
merged are generated concurrently).

*Since: 0.6.0*

mergeMinBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but stops merging as soon as any of the two streams stops.

*Unimplemented*

mergeFstBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like `mergeByM`

but stops merging as soon as the first stream stops.

*Unimplemented*

# Combine Streams and Unfolds

Expand a stream by repeatedly using an unfold and merging the resulting streams. Functions generally ending in the shape:

Unfold m a b -> t m a -> t m b

## Append Many (Unfold)

Unfold and flatten streams.

unfoldManyInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like `unfoldMany`

but interleaves the streams in the same way as
`interleave`

behaves instead of appending them.

*Pre-release*

unfoldManyRoundRobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like `unfoldMany`

but executes the streams in the same way as
`roundrobin`

.

*Pre-release*

## Interpose

Insert effects between streams. Like unfoldMany but intersperses an effect between the streams. A special case of gintercalate.

interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

unwords = S.interpose ' '

*Pre-release*

interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

unlines = S.interposeSuffix '\n'

*Pre-release*

## Intercalate

Insert Streams between Streams. Like unfoldMany but intersperses streams from another source between the streams from the first source.

intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

`intersperse`

followed by unfold and concat.

intercalate unf a str = unfoldMany unf $ intersperse a str intersperse = intercalate (Unfold.function id) unwords = intercalate Unfold.fromList " "

`>>>`

"abc def ghi"`Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"]`

*Since: 0.8.0*

intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

`intersperseMSuffix`

followed by unfold and concat.

intercalateSuffix unf a str = unfoldMany unf $ intersperseMSuffix a str intersperseMSuffix = intercalateSuffix (Unfold.function id) unlines = intercalateSuffix Unfold.fromList "\n"

`>>>`

"abc\ndef\nghi\n"`Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]`

*Since: 0.8.0*

gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

`interleaveInfix`

followed by unfold and concat.

*Pre-release*

gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

`interleaveSuffix`

followed by unfold and concat.

*Pre-release*

# Append Many (concatMap)

Map and serially append streams. `concatMapM`

is a generalization of
the binary append operation to append many streams.

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike `concatMap`

, it can produce an
effect at the beginning of each iteration of the inner loop.

*Since: 0.6.0*

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

`>>>`

`concatMap f = Stream.concatMapM (return . f)`

`>>>`

`concatMap f = Stream.concatMapWith Stream.serial f`

`>>>`

`concatMap f = Stream.concat . Stream.map f`

*Since: 0.6.0*

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

`>>>`

`concatM = Stream.concat . Stream.fromEffect`

`>>>`

`concatM = Stream.concat . lift -- requires (MonadTrans t)`

`>>>`

`concatM = join . lift -- requires (MonadTrans t, Monad (t m))`

*Internal*

concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #

Flatten a stream of streams to a single stream.

concat = concatMap id

*Pre-release*

# Flatten Containers

Flatten `Foldable`

containers using the binary stream merging
operations.

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of `fold`

that allows you to fold a `Foldable`

container of streams using the specified stream sum operation.

`concatFoldableWith ``async`

$ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f D.nil concatFoldableWith f = D.concatMapFoldableWith f id

*Since: 0.8.0 (Renamed foldWith to concatFoldableWith)*

*Since: 0.1.0 (Streamly)*

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of `foldMap`

that allows you to map a monadic streaming action
on a `Foldable`

container and then fold it using the specified stream merge
operation.

`concatMapFoldableWith ``async`

return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

*Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)*

*Since: 0.1.0 (Streamly)*

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like `concatMapFoldableWith`

but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs concatForFoldableWith f = flip (D.concatMapFoldableWith f)

*Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)*

*Since: 0.1.0 (Streamly)*

# ConcatMapWith

Map and flatten a stream like `concatMap`

but using a custom binary
stream merging combinator instead of just appending the streams. The
merging occurs sequentially, it works efficiently for `serial`

, `async`

,
`ahead`

like merge operations where we consume one stream before the
next or in case of `wAsync`

or `parallel`

where we consume all streams
simultaneously anyway.

However, in cases where the merging consumes streams in a round robin
fashion, a pair wise merging using `concatPairsWith`

would be more
efficient. These cases include operations like `mergeBy`

or `zipWith`

.

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

`concatMapWith mixer generator stream`

is a two dimensional looping
combinator. The `generator`

function is used to generate streams from the
elements in the input `stream`

and the `mixer`

function is used to merge
those streams.

Note we can merge streams concurrently by using a concurrent merge function.

*Since: 0.7.0*

*Since: 0.8.0 (signature change)*

concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b Source #

Like `concatMapWith`

but carries a state which can be used to share
information across multiple steps of concat.

concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial

*Pre-release*

# ConcatPairsWith

See the notes about suitable merge functions in the `concatMapWith`

section.

concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

Combine streams in pairs using a binary stream combinator, then combine the resulting streams in pairs recursively until we get to a single combined stream.

For example, you can sort a stream using merge sort like this:

`>>>`

[1,2,5,7,9]`Stream.toList $ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure $ Stream.fromList [5,1,7,9,2]`

*Caution: the stream of streams must be finite*

*Pre-release*

# IterateMap

Map and flatten Trees of Streams

iterateMapWith :: IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a Source #

Like `iterateM`

but iterates after mapping a stream generator on the
output.

Yield an input element in the output stream, map a stream generator on it and then do the same on the resulting stream. This can be used for a depth first traversal of a tree like structure.

Note that `iterateM`

is a special case of `iterateMapWith`

:

iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect

It can be used to traverse a tree structure. For example, to list a directory tree:

Stream.iterateMapWith Stream.serial (either Dir.toEither (const nil)) (fromPure (Left "tmp"))

*Pre-release*

iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a Source #

Like `iterateMap`

but carries a state in the stream generation function.
This can be used to traverse graph like structures, we can remember the
visited nodes in the state to avoid cycles.

Note that a combination of `iterateMap`

and `usingState`

can also be used to
traverse graphs. However, this function provides a more localized state
instead of using a global state.

See also: `mfix`

*Pre-release*

iterateMapLeftsWith :: (IsStream t, b ~ Either a c) => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m b -> t m b Source #

In an `Either`

stream iterate on `Left`

s. This is a special case of
`iterateMapWith`

:

iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))

To traverse a directory tree:

iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))

*Pre-release*

iterateUnfold :: Unfold m a a -> t m a -> t m a Source #

Same as `iterateMapWith Stream.serial`

but more efficient due to stream
fusion.

*Unimplemented*