streamly-0.8.0: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Generate

Description

Most of the combinators in this module can be implemented as unfolds. Some of them however can only be expressed in terms StreamK e.g. cons/consM, fromFoldable, mfix. We can possibly remove those from this module which can be expressed as unfolds. Unless we want to use rewrite rules to rewrite them as StreamK when StreamK is used, avoiding conversion to StreamD. Will that help? Are there any other reasons to keep these and not use unfolds?

Synopsis

Primitives

nil :: IsStream t => t m a Source #

An empty stream.

> toList nil
[]

Since: 0.1.0

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil"))
"nil"
[]

Pre-release

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ fromSerial  $ delay |: delay |: delay |: nil
drain $ fromParallel $ delay |: delay |: delay |: nil

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

From Unfold

unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #

Convert an Unfold into a stream by supplying it an input seed.

>>> Stream.drain $ Stream.unfold (Unfold.replicateM 3) (putStrLn "hello")
hello
hello
hello

Since: 0.7.0

unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b Source #

Convert an Unfold with a closed input end into a stream.

Pre-release

Unfolding

unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #

unfoldr step s =
    case step s of
        Nothing -> nil
        Just (a, b) -> a `cons` unfoldr step b

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 3
        then Nothing
        else Just (b, b + 1)
in Stream.toList $ Stream.unfoldr f 0
:}
[0,1,2,3]

Since: 0.1.0

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 3
        then return Nothing
        else return (Just (b, b + 1))
in Stream.toList $ Stream.unfoldrM f 0
:}
[0,1,2,3]

When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.

(fromAsync $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0)
    & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()

Concurrent

Since: 0.1.0

From Values

fromPure :: IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure
fromPure = fromEffect . pure

In Zip applicative streams fromPure is not the same as pure because in that case pure is equivalent to repeat instead. fromPure and pure are equally efficient, in other cases fromPure may be slightly more efficient than the other equivalent definitions.

Since: 0.8.0 (Renamed yield to fromPure)

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine
hello
["hello"]

Since: 0.8.0 (Renamed yieldM to fromEffect)

repeat :: (IsStream t, Monad m) => a -> t m a Source #

Generate an infinite stream by repeating a pure value.

Since: 0.4.0

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

repeatM = fix . consM
repeatM = cycle1 . fromEffect

Generate a stream by repeatedly executing a monadic action forever.

drain $ fromSerial $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
drain $ fromAsync  $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)

Concurrent, infinite (do not use with fromParallel)

Since: 0.2.0

replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #

replicate = take n . repeat

Generate a stream of length n by repeating a value n times.

Since: 0.6.0

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

replicateM = take n . repeatM

Generate a stream by performing a monadic action n times. Same as:

drain $ fromSerial $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
drain $ fromAsync  $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)

Concurrent

Since: 0.1.1

Enumeration

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type class are equivalent to those in the Enum type class, except that these generate a stream instead of a list. Use the functions in Streamly.Internal.Data.Stream.Enumeration module to define new instances.

Since: 0.6.0

Methods

enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #

enumerateFrom from generates a stream starting with the element from, enumerating up to maxBound when the type is Bounded or generating an infinite stream when the type is not Bounded.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]

For Fractional types, enumeration is numerically stable. However, no overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]

Since: 0.6.0

enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element from, enumerating the type up to the value to. If to is smaller than from then an empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]

For Fractional types, the last element is equal to the specified to value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

Since: 0.6.0

enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #

enumerateFromThen from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For Bounded types the stream ends when maxBound is reached, for unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]

Since: 0.6.0

enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #

enumerateFromThenTo from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]

>>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]

Since: 0.6.0

Instances

Instances details
Enumerable Bool Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> t m Bool Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> Bool -> t m Bool Source #

Enumerable Char Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> t m Char Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> Char -> t m Char Source #

Enumerable Double Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> t m Double Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> Double -> t m Double Source #

Enumerable Float Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> t m Float Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> Float -> t m Float Source #

Enumerable Int Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> t m Int Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> Int -> t m Int Source #

Enumerable Int8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> t m Int8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> Int8 -> t m Int8 Source #

Enumerable Int16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> t m Int16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> Int16 -> t m Int16 Source #

Enumerable Int32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> t m Int32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> Int32 -> t m Int32 Source #

Enumerable Int64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> t m Int64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> Int64 -> t m Int64 Source #

Enumerable Integer Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> t m Integer Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> Integer -> t m Integer Source #

Enumerable Natural Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> t m Natural Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> Natural -> t m Natural Source #

Enumerable Ordering Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> t m Ordering Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> Ordering -> t m Ordering Source #

Enumerable Word Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> t m Word Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> Word -> t m Word Source #

Enumerable Word8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> t m Word8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> Word8 -> t m Word8 Source #

Enumerable Word16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> t m Word16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> Word16 -> t m Word16 Source #

Enumerable Word32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> t m Word32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> Word32 -> t m Word32 Source #

Enumerable Word64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> t m Word64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> Word64 -> t m Word64 Source #

Enumerable () Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> t m () Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> () -> t m () Source #

Integral a => Enumerable (Ratio a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> t m (Ratio a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> Ratio a -> t m (Ratio a) Source #

Enumerable a => Enumerable (Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> t m (Identity a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> Identity a -> t m (Identity a) Source #

HasResolution a => Enumerable (Fixed a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> t m (Fixed a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a Source #

enumerate = enumerateFrom minBound

Enumerate a Bounded type from its minBound to maxBound

Since: 0.6.0

enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

enumerateTo = enumerateFromTo minBound

Enumerate a Bounded type from its minBound to specified value.

Since: 0.6.0

Time Enumeration

times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64) Source #

times returns a stream of time value tuples with clock of 10 ms granularity. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.

>>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))

Note: This API is not safe on 32-bit machines.

Pre-release

absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime Source #

absTimes returns a stream of absolute timestamps using a clock of 10 ms granularity.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

absTimesWith g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64 Source #

relTimes returns a stream of relative time values starting from 0, using a clock of granularity 10 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

relTimesWith g returns a stream of relative time values starting from 0, using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

durations :: Double -> t m RelTime64 Source #

durations g returns a stream of relative time values measuring the time elapsed since the immediate predecessor element of the stream was generated. The first element of the stream is always 0. durations uses a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. The minimum granularity is 1 millisecond. Durations lower than 1 ms will be 0.

Note: This API is not safe on 32-bit machines.

Unimplemented

ticks :: Rate -> t m () Source #

Generate ticks at the specified rate. The rate is adaptive, the tick generation speed can be increased or decreased at different times to achieve the specified rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum rate achieved by a stream is governed by the processor speed.

Unimplemented

timeout :: AbsTime -> t m () Source #

Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.

Unimplemented

From Generators

fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #

fromIndices f = fmap f $ Stream.enumerateFrom 0
fromIndices f = let g i = f i `cons` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a function f applied on the corresponding index. Index starts at 0.

>>> Stream.toList $ Stream.take 5 $ Stream.fromIndices id
[0,1,2,3,4]

Since: 0.6.0

fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a monadic function f applied on the corresponding index. Index starts at 0.

Concurrent

Since: 0.6.0

Iteration

iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #

iterate f x = x `cons` iterate f x

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

>>> Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]

Since: 0.1.2

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

iterateM f m = m >>= a -> return a `consM` iterateM f (f a)

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.

drain $ fromSerial $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)

drain $ fromAsync  $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)

Concurrent

Since: 0.1.2

Since: 0.7.0 (signature change)

Cyclic Elements

mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #

We can define cyclic structures using let:

>>> let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)

The function fix defined as:

fix f = let x = f x in x

ensures that the argument of a function and its output refer to the same lazy value x i.e. the same location in memory. Thus x can be defined in terms of itself, creating structures with cyclic references.

>>> import Data.Function (fix)
>>> f ~(a, b) = ([1, b], head a)
>>> fix f
([1,1],1)

mfix is essentially the same as fix but for monadic values.

Using mfix for streams we can construct a stream in which each element of the stream is defined in a cyclic fashion. The argument of the function being fixed represents the current element of the stream which is being returned by the stream monad. Thus, we can use the argument to construct itself.

In the following example, the argument action of the function f represents the tuple (x,y) returned by it in a given iteration. We define the first element of the tuple in terms of the second.

import Streamly.Internal.Data.Stream.IsStream as Stream
import System.IO.Unsafe (unsafeInterleaveIO)

main = do
    Stream.mapM_ print $ Stream.mfix f

    where

    f action = do
        let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
        x <- Stream.fromListM [incr 1 action, incr 2 action]
        y <- Stream.fromList [4,5]
        return (x, y)

Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.

Note that the function f must be lazy in its argument, that's why we use unsafeInterleaveIO on action because IO monad is strict.

Pre-release

From Containers

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList = foldr cons nil

Construct a stream from a list of pure values. This is more efficient than fromFoldable for serial streams.

Since: 0.4.0

fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #

fromListM = foldr consM nil

Construct a stream from a list of monadic actions. This is more efficient than fromFoldableM for serial streams.

Since: 0.4.0

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

fromFoldable = foldr cons nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #

fromFoldableM = foldr consM nil

Construct a stream from a Foldable containing monadic actions.

drain $ fromSerial $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)
drain $ fromAsync  $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)

Concurrent (do not use with fromParallel on infinite containers)

Since: 0.3.0

fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #

Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.

Pre-release

Deprecated

once :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

Since: 0.2.0

yield :: IsStream t => a -> t m a Source #

Deprecated: Please use fromPure instead.

Same as fromPure

Since: 0.4.0

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

Since: 0.4.0

each :: (IsStream t, Foldable f) => f a -> t m a Source #

Deprecated: Please use fromFoldable instead.

Same as fromFoldable.

Since: 0.1.0

fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #

Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)

Read lines from an IO Handle into a stream of Strings.

Since: 0.1.0

currentTime :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

Deprecated: Please use absTimes instead