Copyright | (c) 2017 Composewell Technologies |
---|---|

License | BSD-3-Clause |

Maintainer | streamly@composewell.com |

Stability | experimental |

Portability | GHC |

Safe Haskell | Safe-Inferred |

Language | Haskell2010 |

Deprecated: Please use "Streamly.Data.Stream.*" instead.

Bottom level IsStream module that can be used by all other upper level IsStream modules.

## Synopsis

- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
- absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime
- relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64
- foldContinue :: Monad m => Fold m a b -> SerialT m a -> Fold m a b
- fold :: Monad m => Fold m a b -> SerialT m a -> m b
- map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
- scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
- postscanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
- postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b
- foldManyPost :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- take :: (IsStream t, Monad m) => Int -> t m a -> t m a
- takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- takeEndBy :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
- findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a
- reverse :: (IsStream t, Monad m) => t m a -> t m a
- reverse' :: (IsStream t, MonadIO m, Unbox a) => t m a -> t m a
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
- concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
- concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
- splitOnSeq :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
- yield :: IsStream t => a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a

# Generation

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure fromPure = fromEffect . pure

In Zip applicative streams `fromPure`

is not the same as `pure`

because in that
case `pure`

is equivalent to `repeat`

instead. `fromPure`

and `pure`

are
equally efficient, in other cases `fromPure`

may be slightly more efficient
than the other equivalent definitions.

*Since: 0.8.0 (Renamed yield to fromPure)*

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine hello ["hello"]

*Since: 0.8.0 (Renamed yieldM to fromEffect)*

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

`>>>`

`repeatM = fix . consM`

`>>>`

`repeatM = cycle1 . fromEffect`

Generate a stream by repeatedly executing a monadic action forever.

`>>>`

repeatAsync = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fromAsync & Stream.drain :}`:{`

*Concurrent, infinite (do not use with fromParallel)*

*Since: 0.2.0*

timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64) Source #

`timesWith g`

returns a stream of time value tuples. The first component
of the tuple is an absolute time reference (epoch) denoting the start of the
stream and the second component is a time relative to the reference.

The argument `g`

specifies the granularity of the relative time in seconds.
A lower granularity clock gives higher precision but is more expensive in
terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

`>>>`

`import Control.Concurrent (threadDelay)`

`>>>`

`import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)`

`>>>`

(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))`Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

`absTimesWith g`

returns a stream of absolute timestamps using a clock of
granularity `g`

specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.

`>>>`

AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

`relTimesWith g`

returns a stream of relative time values starting from 0,
using a clock of granularity `g`

specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.

`>>>`

RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)`Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01`

Note: This API is not safe on 32-bit machines.

*Pre-release*

# Elimination

foldContinue :: Monad m => Fold m a b -> SerialT m a -> Fold m a b Source #

We can create higher order folds using `foldContinue`

. We can fold a
number of streams to a given fold efficiently with full stream fusion. For
example, to fold a list of streams on the same sum fold:

concatFold = Prelude.foldl Stream.foldContinue Fold.sum

fold f = Fold.extractM . Stream.foldContinue f

*Internal*

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left `Fold`

and reducing the resulting
expression strictly at each step. The behavior is similar to `foldl'`

. A
`Fold`

can terminate early without consuming the full stream. See the
documentation of individual `Fold`

s for termination behavior.

`>>>`

5050`Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)`

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

`>>>`

0`Stream.fold Fold.sum Stream.nil`

However, `foldMany`

on an empty stream results in an empty stream.
Therefore, `Stream.fold f`

is not the same as ```
Stream.head . Stream.foldMany
f
```

.

fold f = Stream.parse (Parser.fromFold f)

*Since: 0.7.0*

# Transformation

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

`scanlMAfter' accumulate initial done stream`

is like `scanlM'`

except
that it provides an additional `done`

function to be applied on the
accumulator when the stream stops. The result of `done`

is also emitted in
the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

*Pre-release*

postscanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like `postscanl'`

but with a monadic step function and a monadic seed.

`>>>`

`postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs`

*Since: 0.7.0*

*Since: 0.8.0 (signature change)*

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful `mapM`

, equivalent to a left scan, more like mapAccumL.
Hopefully, this is a better alternative to `scan`

. Separation of state from
the output makes it easier to think in terms of a shared state, and also
makes it easier to keep the state fully strict and the output lazy.

See also: `scanlM'`

*Pre-release*

foldManyPost :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like `foldMany`

but appends empty fold output if the fold and stream
termination aligns:

`>>>`

`f = Fold.take 2 Fold.sum`

`>>>`

[0]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList []`

`>>>`

[3,7,11,15,9]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]`

`>>>`

[3,7,11,15,19,0]`Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]`

*Pre-release*

The stateful step function can be simplified to `(s -> a -> m b)`

to provide
a read-only environment. However, that would just be `mapM`

.

The initial action could be `m (s, Maybe b)`

, and we can also add a final
action `s -> m (Maybe b)`

. This can be used to get pre/post scan like
functionality and also to flush the state in the end like scanlMAfter'.
We can also use it along with a fusible version of bracket to get
scanlMAfter' like functionality. See issue #677.

This can be further generalized to a type similar to Fold/Parser, giving it filtering and parsing capability as well (this is in fact equivalent to parseMany):

smapM :: (s -> a -> m (Step s b)) -> m s -> t m a -> t m b

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first `n`

elements from the stream and discard the rest.

*Since: 0.1.0*

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

*Since: 0.1.0*

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first `n`

elements from the stream and take the rest.

*Since: 0.1.0*

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

*Since: 0.5.0*

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

`>>>`

h.,e.,l.,l.,o"h,e,l,l,o"`Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"`

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

`>>>`

he.l.l.o."h,e,l,l,o"`Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"`

*Since: 0.5.0*

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every `n`

seconds.

> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o

*Pre-release*

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

`>>>`

`reverse = Stream.foldlT (flip Stream.cons) Stream.nil`

*Since 0.7.0 (Monad m constraint)*

*Since: 0.1.1*

# Concurrent

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

*Since: 0.2.0 (Streamly)*

*Since: 0.8.0*

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD

*Pre-release*

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like `parallel`

but stops the output as soon as the first stream stops.

*Pre-release*

# Nesting

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

`>>>`

`concatM = Stream.concat . Stream.fromEffect`

`>>>`

`concatM = Stream.concat . lift -- requires (MonadTrans t)`

`>>>`

`concatM = join . lift -- requires (MonadTrans t, Monad (t m))`

*Internal*

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike `concatMap`

, it can produce an
effect at the beginning of each iteration of the inner loop.

*Since: 0.6.0*

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

`>>>`

`concatMap f = Stream.concatMapM (return . f)`

`>>>`

`concatMap f = Stream.concatMapWith Stream.serial f`

`>>>`

`concatMap f = Stream.concat . Stream.map f`

*Since: 0.6.0*

splitOnSeq :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like `splitOn`

but the separator is a sequence of elements instead of a
single element.

For illustration, let's define a function that operates on pure lists:

`>>>`

`splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)`

`>>>`

["h","e","l","l","o"]`splitOnSeq' "" "hello"`

`>>>`

[""]`splitOnSeq' "hello" ""`

`>>>`

["",""]`splitOnSeq' "hello" "hello"`

`>>>`

["hello"]`splitOnSeq' "x" "hello"`

`>>>`

["","ello"]`splitOnSeq' "h" "hello"`

`>>>`

["hell",""]`splitOnSeq' "o" "hello"`

`>>>`

["h","llo"]`splitOnSeq' "e" "hello"`

`>>>`

["he","","o"]`splitOnSeq' "l" "hello"`

`>>>`

["he","o"]`splitOnSeq' "ll" "hello"`

`splitOnSeq`

is an inverse of `intercalate`

. The following law always holds:

intercalate . splitOnSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOnSeq . intercalate == id

`>>>`

`splitOnSeq pat f = Stream.foldManyPost (Fold.takeEndBySeq_ pat f)`

*Pre-release*

# Zipping

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like `zipWith`

but using a monadic zipping function.

*Since: 0.4.0*

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream `a`

is evaluated first, followed by stream `b`

, the resulting
elements `a`

and `b`

are then zipped using the supplied zip function and the
result `c`

is yielded to the consumer.

If stream `a`

or stream `b`

ends, the zipped stream ends. If stream `b`

ends
first, the element `a`

from previous evaluation of stream `a`

is discarded.

> D.toList $ D.zipWith (+) (D.fromList [1,2,3]) (D.fromList [4,5,6]) [5,7,9]

*Since: 0.1.0*