streamly-0.8.2: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Common

Description

Bottom level IsStream module that can be used by all other upper level IsStream modules.

Synopsis

Generation

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure
fromPure = fromEffect . pure

In Zip applicative streams fromPure is not the same as pure because in that case pure is equivalent to repeat instead. fromPure and pure are equally efficient, in other cases fromPure may be slightly more efficient than the other equivalent definitions.

Since: 0.8.0 (Renamed yield to fromPure)

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine
hello
["hello"]

Since: 0.8.0 (Renamed yieldM to fromEffect)

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

>>> repeatM = fix . consM
>>> repeatM = cycle1 . fromEffect

Generate a stream by repeatedly executing a monadic action forever.

>>> :{
repeatAsync =
       Stream.repeatM (threadDelay 1000000 >> print 1)
     & Stream.take 10
     & Stream.fromAsync
     & Stream.drain
:}

Concurrent, infinite (do not use with fromParallel)

Since: 0.2.0

timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64) Source #

timesWith g returns a stream of time value tuples. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.

The argument g specifies the granularity of the relative time in seconds. A lower granularity clock gives higher precision but is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)
>>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))

Note: This API is not safe on 32-bit machines.

Pre-release

absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

absTimesWith g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

relTimesWith g returns a stream of relative time values starting from 0, using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

Elimination

foldOn :: Monad m => Fold m a b -> SerialT m a -> Fold m a b Source #

We can create higher order folds using foldOn. We can fold a number of streams to a given fold efficiently with full stream fusion. For example, to fold a list of streams on the same sum fold:

>>> concatFold = Prelude.foldl Stream.foldOn Fold.sum
>>> fold f = Fold.finish . Stream.foldOn f

Internal

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

>>> Stream.fold Fold.sum Stream.nil
0

However, foldMany on an empty stream results in an empty stream. Therefore, Stream.fold f is not the same as Stream.head . Stream.foldMany f.

fold f = Stream.parse (Parser.fromFold f)

Since: 0.7.0

fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a) Source #

Transformation

map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b Source #

map = fmap

Same as fmap.

> S.toList $ S.map (+1) $ S.fromList [1,2,3]
[2,3,4]

Since: 0.4.0

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

scanlMAfter' accumulate initial done stream is like scanlM' except that it provides an additional done function to be applied on the accumulator when the stream stops. The result of done is also emitted in the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

Pre-release

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function and a monadic seed.

>>> postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs

Since: 0.7.0

Since: 0.8.0 (signature change)

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful mapM, equivalent to a left scan, more like mapAccumL. Hopefully, this is a better alternative to scan. Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy.

See also: scanlM'

Pre-release

The stateful step function can be simplified to (s -> a -> m b) to provide a read-only environment. However, that would just be mapM.

The initial action could be m (s, Maybe b), and we can also add a final action s -> m (Maybe b). This can be used to get pre/post scan like functionality and also to flush the state in the end like scanlMAfter'. We can also use it along with a fusible version of bracket to get scanlMAfter' like functionality. See issue #677.

This can be further generalized to a type similar to Fold/Parser, giving it filtering and parsing capability as well (this is in fact equivalent to parseMany):

smapM :: (s -> a -> m (Step s b)) -> m s -> t m a -> t m b

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: 0.1.0

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: 0.1.0

takeEndBy :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: 0.1.0

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

Since: 0.5.0

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

>>> Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

>>> Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"
he.l.l.o."h,e,l,l,o"

Since: 0.5.0

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n seconds.

> import Control.Concurrent (threadDelay)
> Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello"
h,e,l,l,o

Pre-release

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

>>> reverse = Stream.foldlT (flip Stream.cons) Stream.nil

Since 0.7.0 (Monad m constraint)

Since: 0.1.1

reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a Source #

Like reverse but several times faster, requires a Storable instance.

Pre-release

Concurrent

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Since: 0.2.0 (Streamly)

Since: 0.8.0

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD

Pre-release

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as the first stream stops.

Pre-release

Nesting

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

>>> concatM = Stream.concat . Stream.fromEffect
>>> concatM = Stream.concat . lift    -- requires (MonadTrans t)
>>> concatM = join . lift             -- requires (MonadTrans t, Monad (t m))

See also: concat, sequence

Internal

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

>>> concatMap f = Stream.concatMapM (return . f)
>>> concatMap f = Stream.concatMapWith Stream.serial f
>>> concatMap f = Stream.concat . Stream.map f
>>> concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)

Since: 0.6.0

splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOn but the separator is a sequence of elements instead of a single element.

For illustration, let's define a function that operates on pure lists:

>>> splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>> splitOnSeq' "hello" ""
[""]
>>> splitOnSeq' "hello" "hello"
["",""]
>>> splitOnSeq' "x" "hello"
["hello"]
>>> splitOnSeq' "h" "hello"
["","ello"]
>>> splitOnSeq' "o" "hello"
["hell",""]
>>> splitOnSeq' "e" "hello"
["h","llo"]
>>> splitOnSeq' "l" "hello"
["he","","o"]
>>> splitOnSeq' "ll" "hello"
["he","o"]

splitOnSeq is an inverse of intercalate. The following law always holds:

intercalate . splitOnSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOnSeq . intercalate == id

Pre-release

Zipping

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWith but using a monadic zipping function.

Since: 0.4.0

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream a is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer.

If stream a or stream b ends, the zipped stream ends. If stream b ends first, the element a from previous evaluation of stream a is discarded.

> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6])
[5,7,9]

Since: 0.1.0

Deprecated

yield :: IsStream t => a -> t m a Source #

Deprecated: Please use fromPure instead.

Same as fromPure

Since: 0.4.0

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

Since: 0.4.0