Copyright | (c) 2020 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Synopsis
- periodic :: MonadIO m => m a -> Double -> Stream m a
- ticks :: MonadIO m => Double -> Stream m ()
- ticksRate :: MonadAsync m => Rate -> Stream m ()
- interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a
- takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
- takeLastInterval :: Double -> Stream m a -> Stream m a
- dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
- dropLastInterval :: Int -> Stream m a -> Stream m a
- intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b
- chunksOfTimeout :: MonadAsync m => Int -> Double -> Fold m a b -> Stream m a -> Stream m b
- sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleBurst :: MonadAsync m => Bool -> Double -> Stream m a -> Stream m a
- sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a
- classifySessionsByGeneric :: forall m f a b. (MonadAsync m, IsMap f) => Proxy (f :: Type -> Type) -> Double -> Bool -> (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (Key f, a)) -> Stream m (Key f, b)
- classifySessionsBy :: (MonadAsync m, Ord k) => Double -> Bool -> (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (k, a)) -> Stream m (k, b)
- classifySessionsOf :: (MonadAsync m, Ord k) => (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (k, a)) -> Stream m (k, b)
- classifyKeepAliveSessions :: (MonadAsync m, Ord k) => (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (k, a)) -> Stream m (k, b)
- bufferLatest :: Stream m a -> Stream m (Maybe a)
- bufferLatestN :: Int -> Stream m a -> Stream m a
- bufferOldestN :: Int -> Stream m a -> Stream m a
Imports for Examples
Imports for example snippets in this module.
>>>
:m
>>>
import Control.Concurrent (threadDelay)
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.Stream.Prelude as Stream
>>>
import qualified Streamly.Internal.Data.Stream as Stream (delayPost, timestamped)
>>>
import qualified Streamly.Internal.Data.Stream.Concurrent as Stream (parListEagerFst)
>>>
import qualified Streamly.Internal.Data.Stream.Time as Stream
>>>
import Prelude hiding (concatMap, concat)
>>>
:{
delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Timers
periodic :: MonadIO m => m a -> Double -> Stream m a Source #
Generate a stream by running an action periodically at the specified time interval.
ticks :: MonadIO m => Double -> Stream m () Source #
Generate a tick stream consisting of ()
elements, each tick is generated
after the specified time delay given in seconds.
>>>
ticks = Stream.periodic (return ())
ticksRate :: MonadAsync m => Rate -> Stream m () Source #
Generate a tick stream, ticks are generated at the specified Rate
. The
rate is adaptive, the tick generation speed can be increased or decreased at
different times to achieve the specified rate. The specific behavior for
different styles of Rate
specifications is documented under Rate
. The
effective maximum rate achieved by a stream is governed by the processor
speed.
>>>
tickStream = Stream.repeatM (return ())
>>>
ticksRate r = Stream.parEval (Stream.rate (Just r)) tickStream
interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a Source #
Intersperse a monadic action into the input stream after every n
seconds.
Definition:
>>>
interject n f xs = Stream.parListEagerFst [xs, Stream.periodic f n]
Example:
>>>
s = Stream.fromList "hello"
>>>
input = Stream.mapM (\x -> threadDelay 1000000 >> putChar x) s
>>>
Stream.fold Fold.drain $ Stream.interject (putChar ',') 1.05 input
h,e,l,l,o
Trimming
takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
takeInterval interval
runs the stream only upto the specified time
interval
in seconds.
The interval starts when the stream is evaluated for the first time.
takeLastInterval :: Double -> Stream m a -> Stream m a Source #
Take time interval i
seconds at the end of the stream.
O(n) space, where n is the number elements taken.
Unimplemented
dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
dropInterval interval
drops all the stream elements that are generated
before the specified interval
in seconds has passed.
The interval begins when the stream is evaluated for the first time.
dropLastInterval :: Int -> Stream m a -> Stream m a Source #
Drop time interval i
seconds at the end of the stream.
O(n) space, where n is the number elements dropped.
Unimplemented
Chunking
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b Source #
Group the input stream into windows of n
second each and then fold each
group using the provided fold function.
>>>
twoPerSec = Stream.parEval (Stream.constRate 2) $ Stream.enumerateFrom 1
>>>
intervals = Stream.intervalsOf 1 Fold.toList twoPerSec
>>>
Stream.fold Fold.toList $ Stream.take 2 intervals
[...,...]
chunksOfTimeout :: MonadAsync m => Int -> Double -> Fold m a b -> Stream m a -> Stream m b Source #
Like chunksOf
but if the chunk is not completed within the specified
time interval then emit whatever we have collected till now. The chunk
timeout is reset whenever a chunk is emitted. The granularity of the clock
is 100 ms.
>>>
s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
>>>
f = Stream.fold (Fold.drainMapM print) $ Stream.chunksOfTimeout 5 1 Fold.toList s
Pre-release
Sampling
sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Continuously evaluate the input stream and sample the last event in each
time window of n
seconds.
This is also known as throttle
in some libraries.
>>>
sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.latest
sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Like sampleInterval
but samples at the beginning of the time window.
>>>
sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one
sampleBurst :: MonadAsync m => Bool -> Double -> Stream m a -> Stream m a Source #
sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.
This is known as debounce
in some libraries.
The clock granularity is 10 ms.
sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Like sampleBurstEnd
but samples the event at the beginning of the burst
instead of at the end of it.
Windowed Sessions
classifySessionsByGeneric Source #
:: forall m f a b. (MonadAsync m, IsMap f) | |
=> Proxy (f :: Type -> Type) | |
-> Double | timer tick in seconds |
-> Bool | reset the timer when an event is received |
-> (Int -> m Bool) | predicate to eject sessions based on session count |
-> Double | session timeout in seconds |
-> Fold m a b | Fold to be applied to session data |
-> Stream m (AbsTime, (Key f, a)) | timestamp, (session key, session data) |
-> Stream m (Key f, b) | session key, fold result |
:: (MonadAsync m, Ord k) | |
=> Double | timer tick in seconds |
-> Bool | reset the timer when an event is received |
-> (Int -> m Bool) | predicate to eject sessions based on session count |
-> Double | session timeout in seconds |
-> Fold m a b | Fold to be applied to session data |
-> Stream m (AbsTime, (k, a)) | timestamp, (session key, session data) |
-> Stream m (k, b) | session key, fold result |
classifySessionsBy tick keepalive predicate timeout fold stream
classifies an input event stream
consisting of (timestamp, (key,
value))
into sessions based on the key
, folding all the values
corresponding to the same key into a session using the supplied fold
.
When the fold terminates or a timeout
occurs, a tuple consisting of the
session key and the folded value is emitted in the output stream. The
timeout is measured from the first event in the session. If the keepalive
option is set to True
the timeout is reset to 0 whenever an event is
received.
The timestamp
in the input stream is an absolute time from some epoch,
characterizing the time when the input event was generated. The notion of
current time is maintained by a monotonic event time clock using the
timestamps seen in the input stream. The latest timestamp seen till now is
used as the base for the current time. When no new events are seen, a timer
is started with a clock resolution of tick
seconds. This timer is used to
detect session timeouts in the absence of new events.
To ensure an upper bound on the memory used the number of sessions can be
limited to an upper bound. If the ejection predicate
returns True
, the
oldest session is ejected before inserting a new session.
When the stream ends any buffered sessions are ejected immediately.
If a session key is received even after a session has finished, another session is created for that key.
>>>
:{
Stream.fold (Fold.drainMapM print) $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList) $ Stream.timestamped $ Stream.delay 0.1 $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c']) :} (1,"abc") (2,"abc") (3,"abc")
Pre-release
:: (MonadAsync m, Ord k) | |
=> (Int -> m Bool) | predicate to eject sessions on session count |
-> Double | time window size |
-> Fold m a b | Fold to be applied to session data |
-> Stream m (AbsTime, (k, a)) | timestamp, (session key, session data) |
-> Stream m (k, b) |
Same as classifySessionsBy
with a timer tick of 1 second and keepalive
option set to False
.
>>>
classifySessionsOf = Stream.classifySessionsBy 1 False
Pre-release
classifyKeepAliveSessions Source #
:: (MonadAsync m, Ord k) | |
=> (Int -> m Bool) | predicate to eject sessions on session count |
-> Double | session inactive timeout |
-> Fold m a b | Fold to be applied to session payload data |
-> Stream m (AbsTime, (k, a)) | timestamp, (session key, session data) |
-> Stream m (k, b) |
Same as classifySessionsBy
with a timer tick of 1 second and keepalive
option set to True
.
classifyKeepAliveSessions = classifySessionsBy 1 True
Pre-release
Buffering
Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end.
bufferLatest :: Stream m a -> Stream m (Maybe a) Source #
Always produce the latest available element from the stream without any delay. The stream is continuously evaluated at the highest possible rate and only the latest element is retained for sampling.
Unimplemented
bufferLatestN :: Int -> Stream m a -> Stream m a Source #
Evaluate the input stream continuously and keep only the latest n
elements in a ring buffer, keep discarding the older ones to make space for
the new ones. When the output stream is evaluated the buffer collected till
now is streamed and it starts filling again.
Unimplemented
bufferOldestN :: Int -> Stream m a -> Stream m a Source #
Evaluate the input stream continuously and keep only the oldest n
elements in the buffer, discard the new ones when the buffer is full. When
the output stream is evaluated the collected buffer is streamed and the
buffer starts filling again.
Unimplemented