streamly-0.8.0: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Common

Description

Bottom level IsStream module that can be used by all other upper level IsStream modules.

Synopsis

Generation

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure
fromPure = fromEffect . pure

In Zip applicative streams fromPure is not the same as pure because in that case pure is equivalent to repeat instead. fromPure and pure are equally efficient, in other cases fromPure may be slightly more efficient than the other equivalent definitions.

Since: 0.8.0 (Renamed yield to fromPure)

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine
hello
["hello"]

Since: 0.8.0 (Renamed yieldM to fromEffect)

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

repeatM = fix . consM
repeatM = cycle1 . fromEffect

Generate a stream by repeatedly executing a monadic action forever.

drain $ fromSerial $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
drain $ fromAsync  $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)

Concurrent, infinite (do not use with fromParallel)

Since: 0.2.0

timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64) Source #

timesWith g returns a stream of time value tuples. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.

The argument g specifies the granularity of the relative time in seconds. A lower granularity clock gives higher precision but is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)
>>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))

Note: This API is not safe on 32-bit machines.

Pre-release

absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

absTimesWith g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

relTimesWith g returns a stream of relative time values starting from 0, using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

Elimination

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

>>> Stream.fold Fold.sum Stream.nil
0

However, foldMany on an empty stream results in an empty stream. Therefore, Stream.fold f is not the same as Stream.head . Stream.foldMany f.

fold f = Stream.parse (Parser.fromFold f)

Since: 0.7.0

fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a) Source #

Transformation

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

scanlMAfter' accumulate initial done stream is like scanlM' except that it provides an additional done function to be applied on the accumulator when the stream stops. The result of done is also emitted in the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

Pre-release

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function and a monadic seed.

Since: 0.7.0

Since: 0.8.0 (signature change)

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful mapM, equivalent to a left scan, more like mapAccumL. Hopefully, this is a better alternative to scan. Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy.

See also: scanlM'

Pre-release

The stateful step function can be simplified to (s -> a -> m b) to provide a read-only environment. However, that would just be mapM.

The initial action could be m (s, Maybe b), and we can also add a final action s -> m (Maybe b). This can be used to get pre/post scan like functionality and also to flush the state in the end like scanlMAfter'. We can also use it along with a fusible version of bracket to get scanlMAfter' like functionality. See issue #677.

This can be further generalized to a type similar to Fold/Parser, giving it filtering and parsing capability as well (this is in fact equivalent to parseMany):

smapM :: (s -> a -> m (Step s b)) -> m s -> t m a -> t m b

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: 0.1.0

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: 0.1.0

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: 0.1.0

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

Since: 0.5.0

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

>>> Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"

Since: 0.5.0

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n seconds.

> import Control.Concurrent (threadDelay)
> Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello"
h,e,l,l,o

Pre-release

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

Since 0.7.0 (Monad m constraint)

Since: 0.1.1

reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a Source #

Like reverse but several times faster, requires a Storable instance.

Pre-release

Nesting

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

concatM = concat . fromEffect
concatM = concat . lift    -- requires (MonadTrans t)
concatM = join . lift      -- requires (MonadTrans t, Monad (t m))

See also: concat, sequence

Internal

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

concatMap f = concatMapM (return . f)
concatMap = concatMapWith serial
concatMap f = 'concat . map f'
concatMap f = unfoldMany (UF.lmap f UF.fromStream)

Since: 0.6.0

splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOn but the separator is a sequence of elements instead of a single element.

For illustration, let's define a function that operates on pure lists:

>>> splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>> splitOnSeq' "hello" ""
[""]
>>> splitOnSeq' "hello" "hello"
["",""]
>>> splitOnSeq' "x" "hello"
["hello"]
>>> splitOnSeq' "h" "hello"
["","ello"]
>>> splitOnSeq' "o" "hello"
["hell",""]
>>> splitOnSeq' "e" "hello"
["h","llo"]
>>> splitOnSeq' "l" "hello"
["he","","o"]
>>> splitOnSeq' "ll" "hello"
["he","o"]

splitOnSeq is an inverse of intercalate. The following law always holds:

intercalate . splitOn == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOn . intercalate == id

Pre-release

Deprecated

yield :: IsStream t => a -> t m a Source #

Deprecated: Please use fromPure instead.

Same as fromPure

Since: 0.4.0

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

Since: 0.4.0