streamly-0.8.2: Dataflow programming and declarative concurrency
Copyright(c) 2020 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Top

Description

Top level IsStream module that can use all other lower level IsStream modules.

Synopsis

Transformation

Sampling

Value agnostic filtering.

sampleFromThen :: (IsStream t, Monad m, Functor (t m)) => Int -> Int -> t m a -> t m a Source #

sampleFromthen offset stride samples the element at offset index and then every element at strides of stride.

>>> Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
[2,5,8]

Pre-release

sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like sampleInterval but samples at the beginning of the time window.

sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.head

Pre-release

sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Continuously evaluate the input stream and sample the last event in time window of n seconds.

This is also known as throttle in some libraries.

sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last

Pre-release

sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like sampleBurstEnd but samples the event at the beginning of the burst instead of at the end of it.

Pre-release

sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval from the previous event.

This is known as debounce in some libraries.

The clock granularity is 10 ms.

Pre-release

Reordering

sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a Source #

Sort the input stream using a supplied comparison function.

O(n) space

Note: this is not the fastest possible implementation as of now.

Pre-release

Nesting

Set like operations

These are not exactly set operations because streams are not necessarily sets, they may have duplicated elements.

intersectBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

intersectBy is essentially a filtering operation that retains only those elements in the first stream that are present in the second stream.

>>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,2,2]
>>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
[2,1,1]

intersectBy is similar to but not the same as joinInner:

>>> Stream.toList $ fmap fst $ Stream.joinInner (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,1,2,2]

Space: O(n) where n is the number of elements in the second stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

intersectBySorted :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like intersectBy but works only on streams sorted in ascending order.

Space: O(1)

Time: O(m+n)

Pre-release

differenceBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

Delete first occurrences of those elements from the first stream that are present in the second stream. If an element occurs multiple times in the second stream as many occurrences of it are deleted from the first stream.

>>> Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])
[2]

The following laws hold:

(s1 serial s2) `differenceBy eq` s1 === s2
(s1 wSerial s2) `differenceBy eq` s1 === s2

Same as the list // operation.

Space: O(m) where m is the number of elements in the first stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like differenceBy but works only on sorted streams.

Space: O(1)

Unimplemented

unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

This is essentially an append operation that appends all the extra occurrences of elements from the second stream that are not already present in the first stream.

>>> Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])
[1,2,2,4,3]

Equivalent to the following except that s1 is evaluated only once:

unionBy eq s1 s2 = s1 `serial` (s2 `differenceBy eq` s1)

Similar to joinOuter but not the same.

Space: O(n)

Time: O(m x n)

Pre-release

mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like unionBy but works only on sorted streams.

Space: O(1)

Unimplemented

Join operations

crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b) Source #

This is the same as outerProduct but less efficient.

The second stream is evaluated multiple times. If the second stream is consume-once stream then it can be cached in an Array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

Time: O(m x n)

Pre-release

joinInner :: forall (t :: (Type -> Type) -> Type -> Type) m a b. (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> t m (a, b) Source #

For all elements in t m a, for all elements in t m b if a and b are equal by the given equality pedicate then return the tuple (a, b).

The second stream is evaluated multiple times. If the stream is a consume-once stream then the caller should cache it (e.g. in a Array) before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

For space efficiency use the smaller stream as the second stream.

You should almost always use joinInnerMap instead of joinInner. joinInnerMap is an order of magnitude faster. joinInner may be used when the second stream is generated from a seed, therefore, need not be stored in memory and the amount of memory it takes is a concern.

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

Pre-release

joinInnerMap :: (IsStream t, Monad m, Ord k) => t m (k, a) -> t m (k, b) -> t m (k, a, b) Source #

Like joinInner but uses a Map for efficiency.

If the input streams have duplicate keys, the behavior is undefined.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m + n)

Pre-release

joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b) Source #

Like joinInner but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

joinLeft :: Monad m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b) Source #

For all elements in t m a, for all elements in t m b if a and b are equal then return the tuple (a, Just b). If a is not present in t m b then return (a, Nothing).

The second stream is evaluated multiple times. If the stream is a consume-once stream then the caller should cache it in an Array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

rightJoin = flip joinLeft

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

Unimplemented

mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b) Source #

Like joinLeft but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

joinLeftMap :: (IsStream t, Ord k, Monad m) => t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b) Source #

Like joinLeft but uses a hashmap for efficiency.

Space: O(n)

Time: O(m + n)

Pre-release

joinOuter :: MonadIO m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b) Source #

For all elements in t m a, for all elements in t m b if a and b are equal by the given equality pedicate then return the tuple (Just a, Just b). If a is not found in t m b then return (a, Nothing), return (Nothing, b) for vice-versa.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m x n)

Unimplemented

mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b) Source #

Like joinOuter but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

joinOuterMap :: (IsStream t, Ord k, MonadIO m) => t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b) Source #

Like joinOuter but uses a Map for efficiency.

Space: O(m + n)

Time: O(m + n)

Pre-release