Copyright | (c) 2017 Harendra Kumar |
---|---|
License | BSD3 |
Maintainer | harendra.kumar@gmail.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
The way a list represents a sequence of pure values, a stream represents a sequence of monadic actions. The monadic stream API offered by Streamly is very close to the Haskell Prelude pure lists' API, it can be considered as a natural extension of lists to monadic actions. Streamly streams provide concurrent composition and merging of streams. It can be considered as a concurrent list transformer. In contrast to the Prelude lists, merging or appending streams of arbitrary length is scalable and inexpensive.
The basic stream type is Serial
, it represents a sequence of IO actions,
and is a Monad
. The type SerialT
is a monad transformer that can
represent a sequence of actions in an arbitrary monad. The type Serial
is
in fact a synonym for SerialT IO
. There are a few more types similar to
SerialT
, all of them represent a stream and differ only in the
Semigroup
, Applicative
and Monad
compositions of the stream. Serial
and WSerial
types compose serially whereas Async
and WAsync
types compose concurrently. All these types can be freely inter-converted
using type combinators without any cost. You can freely switch to any type
of composition at any point in the program. When no type annotation or
explicit stream type combinators are used, the default stream type is
inferred as Serial
.
Here is a simple console echo program example:
> runStream $ S.repeatM getLine & S.mapM putStrLn
For more details please see the Streamly.Tutorial module and the examples directory in this package.
This module exports stream types, instances and some basic operations. Functionality exported by this module include:
- Semigroup append (
<>
) instances as well as explicit operations for merging streams - Monad and Applicative instances for looping over streams
- Zip Applicatives for zipping streams
- Stream type combinators to convert between different composition styles
- Some basic utilities to run and fold streams
See the Streamly.Prelude module for comprehensive APIs for construction, generation, elimination and transformation of streams.
This module is designed to be imported unqualified:
import Streamly
Synopsis
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data SerialT m a
- data WSerialT m a
- data AheadT m a
- data AsyncT m a
- data WAsyncT m a
- data ParallelT m a
- data ZipSerialM m a
- data ZipAsyncM m a
- runStream :: Monad m => SerialT m a -> m ()
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
- serial :: IsStream t => t m a -> t m a -> t m a
- wSerial :: IsStream t => t m a -> t m a -> t m a
- ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- maxThreads :: IsStream t => Int -> t m a -> t m a
- maxBuffer :: IsStream t => Int -> t m a -> t m a
- data Rate = Rate {}
- rate :: IsStream t => Maybe Rate -> t m a -> t m a
- avgRate :: IsStream t => Double -> t m a -> t m a
- minRate :: IsStream t => Double -> t m a -> t m a
- maxRate :: IsStream t => Double -> t m a -> t m a
- constRate :: IsStream t => Double -> t m a -> t m a
- foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- class IsStream t
- serially :: IsStream t => SerialT m a -> t m a
- wSerially :: IsStream t => WSerialT m a -> t m a
- asyncly :: IsStream t => AsyncT m a -> t m a
- aheadly :: IsStream t => AheadT m a -> t m a
- wAsyncly :: IsStream t => WAsyncT m a -> t m a
- parallely :: IsStream t => ParallelT m a -> t m a
- zipSerially :: IsStream t => ZipSerialM m a -> t m a
- zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- type Serial = SerialT IO
- type WSerial = WSerialT IO
- type Ahead = AheadT IO
- type Async = AsyncT IO
- type WAsync = WAsyncT IO
- type Parallel = ParallelT IO
- type ZipSerial = ZipSerialM IO
- type ZipAsync = ZipAsyncM IO
- class Semigroup a where
- type Streaming = IsStream
- runStreaming :: (Monad m, IsStream t) => t m a -> m ()
- runStreamT :: Monad m => SerialT m a -> m ()
- runInterleavedT :: Monad m => WSerialT m a -> m ()
- runAsyncT :: Monad m => AsyncT m a -> m ()
- runParallelT :: Monad m => ParallelT m a -> m ()
- runZipStream :: Monad m => ZipSerialM m a -> m ()
- runZipAsync :: Monad m => ZipAsyncM m a -> m ()
- type StreamT = SerialT
- type InterleavedT = WSerialT
- type ZipStream = ZipSerialM
- interleaving :: IsStream t => WSerialT m a -> t m a
- zipping :: IsStream t => ZipSerialM m a -> t m a
- zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
- (<=>) :: IsStream t => t m a -> t m a -> t m a
- (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
Documentation
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: 0.1.0
Stream transformers
Serial Streams
Serial streams compose serially or non-concurrently. In a composed stream,
each action is executed only after the previous action has finished. The two
serial stream types SerialT
and WSerialT
differ in how they traverse the
streams in a Semigroup
or Monad
composition.
Deep serial composition or serial composition with depth first traversal.
The Semigroup
instance of SerialT
appends two streams serially in a
depth first manner, yielding all elements from the first stream, and then
all elements from the second stream.
import Streamly import qualified Streamly.Prelude as S main = (toList
.serially
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
The Monad
instance runs the monadic continuation for each
element of the stream, serially.
main =runStream
.serially
$ do x <- return 1 <> return 2 S.yieldM $ print x
1 2
SerialT
nests streams serially in a depth first manner.
main =runStream
.serially
$ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)
(1,3) (1,4) (2,3) (2,4)
This behavior of SerialT
is exactly like a list transformer. We call the
monadic code being run for each element of the stream a monadic
continuation. In imperative paradigm we can think of this composition as
nested for
loops and the monadic continuation is the body of the loop. The
loop iterates for all elements of the stream.
The serially
combinator can be omitted as the default stream type is
SerialT
.
Note that serial composition with depth first traversal can be used to
combine an infinite number of streams as it explores only one stream at a
time.
Since: 0.2.0
Instances
Wide serial composition or serial composition with a breadth first
traversal. The Semigroup
instance of WSerialT
traverses
the two streams in a breadth first manner. In other words, it interleaves
two streams, yielding one element from each stream alternately.
import Streamly import qualified Streamly.Prelude as S main = (toList
.wSerially
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
Similarly, the Monad
instance interleaves the iterations of the
inner and the outer loop, nesting loops in a breadth first manner.
main =runStream
.wSerially
$ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)
(1,3) (2,3) (1,4) (2,4)
Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.2.0
Instances
Concurrent Lookahead Streams
Deep ahead composition or ahead composition with depth first traversal.
The semigroup composition of AheadT
appends streams in a depth first
manner just like SerialT
except that it can produce elements concurrently
ahead of time. It is like AsyncT
except that AsyncT
produces the output
as it arrives whereas AheadT
orders the output in the traversal order.
main = (toList
.aheadly
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream.
Similarly, the monad instance of AheadT
may run each iteration
concurrently ahead of time but presents the results in the same order as
SerialT
.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.aheadly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
All iterations may run in the same thread if they do not block.
Note that ahead composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.
Since: 0.3.0
Instances
MonadTrans AheadT Source # | |
Defined in Streamly.Streams.Ahead | |
IsStream AheadT Source # | |
Defined in Streamly.Streams.Ahead | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # | |
Defined in Streamly.Streams.Ahead | |
(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # | |
MonadAsync m => Monad (AheadT m) Source # | |
Monad m => Functor (AheadT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AheadT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # | |
Defined in Streamly.Streams.Ahead | |
(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # | |
Defined in Streamly.Streams.Ahead | |
MonadAsync m => Semigroup (AheadT m a) Source # | |
MonadAsync m => Monoid (AheadT m a) Source # | |
Concurrent Asynchronous Streams
The async style streams execute actions asynchronously and consume the
outputs as well asynchronously. In a composed stream, at any point of time
more than one stream can run concurrently and yield elements. The elements
are yielded by the composed stream as they are generated by the constituent
streams on a first come first serve basis. Therefore, on each run the
stream may yield elements in a different sequence depending on the delays
introduced by scheduling. The two async types AsyncT
and WAsyncT
differ
in how they traverse streams in Semigroup
or Monad
compositions.
Deep async composition or async composition with depth first traversal. In
a left to right Semigroup
composition it tries to yield elements from the
left stream as long as it can, but it can run the right stream in parallel
if it needs to, based on demand. The right stream can be run if the left
stream blocks on IO or cannot produce elements fast enough for the consumer.
main = (toList
.asyncly
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the monad instance of AsyncT
may run each iteration
concurrently based on demand. More concurrent iterations are started only
if the previous iterations are not able to produce enough output for the
consumer.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.asyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
All iterations may run in the same thread if they do not block.
Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.
Since: 0.1.0
Instances
MonadTrans AsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Streams.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
MonadAsync m => Monad (AsyncT m) Source # | |
Monad m => Functor (AsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Monoid (AsyncT m a) Source # | |
Wide async composition or async composition with breadth first traversal.
The Semigroup instance of WAsyncT
concurrently traverses the composed
streams using a depth first travesal or in a round robin fashion, yielding
elements from both streams alternately.
main = (toList
.wAsyncly
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of WAsyncT
runs all iterations fairly
concurrently using a round robin scheduling.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.wAsyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Unlike AsyncT
all iterations are guaranteed to run fairly
concurrently, unconditionally.
Note that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.2.0
Instances
MonadTrans WAsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream WAsyncT Source # | |
Defined in Streamly.Streams.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
MonadAsync m => Monad (WAsyncT m) Source # | |
Monad m => Functor (WAsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Monoid (WAsyncT m a) Source # | |
Async composition with simultaneous traversal of all streams.
The Semigroup instance of ParallelT
concurrently merges two streams,
running both strictly concurrently and yielding elements from both streams
as they arrive. When multiple streams are combined using ParallelT
each
one is evaluated in its own thread and the results produced are presented in
the combined stream on a first come first serve basis.
AsyncT
and WAsyncT
are concurrent lookahead streams each with a
specific type of consumption pattern (depth first or breadth first). Since
they are lookahead, they may introduce certain default latency in starting
more concurrent tasks for efficiency reasons or may put a default limitation
on the resource consumption (e.g. number of concurrent threads for
lookahead). If we look at the implementation detail, they both can share a
pool of worker threads to evaluate the streams in the desired pattern and at
the desired rate. However, ParallelT
uses a separate runtime thread to
evaluate each stream.
WAsyncT
is similar to ParallelT
, as both of them evaluate the
constituent streams fairly in a round robin fashion.
However, the key difference is that WAsyncT
is lazy or pull driven
whereas ParallelT
is strict or push driven. ParallelT
immediately
starts concurrent evaluation of both the streams (in separate threads) and
later picks the results whereas WAsyncT
may wait for a certain latency
threshold before initiating concurrent evaluation of the next stream. The
concurrent scheduling of the next stream or the degree of concurrency is
driven by the feedback from the consumer. In case of ParallelT
each stream
is evaluated in a separate thread and results are pushed to a shared
output buffer, the evaluation rate is controlled by blocking when the buffer
is full.
Concurrent lookahead streams are generally more efficient than
ParallelT
and can work pretty efficiently even for smaller tasks because
they do not necessarily use a separate thread for each task. So they should
be preferred over ParallelT
especially when efficiency is a concern and
simultaneous strict evaluation is not a requirement. ParallelT
is useful
for cases when the streams are required to be evaluated simultaneously
irrespective of how the consumer consumes them e.g. when we want to race
two tasks and want to start both strictly at the same time or if we have
timers in the parallel tasks and our results depend on the timers being
started at the same time. We can say that ParallelT
is almost the same
(modulo some implementation differences) as WAsyncT
when the latter is
used with unlimited lookahead and zero latency in initiating lookahead.
main = (toList
.parallely
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the Async
style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of ParallelT
runs all iterations
of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.parallely
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.1.0
Instances
MonadTrans ParallelT Source # | |
Defined in Streamly.Streams.Parallel | |
IsStream ParallelT Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # | |
MonadAsync m => Monad (ParallelT m) Source # | |
Monad m => Functor (ParallelT m) Source # | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Monoid (ParallelT m a) Source # | |
Zipping Streams
ZipSerialM
and ZipAsyncM
, provide Applicative
instances for zipping the
corresponding elements of two streams together. Note that these types are
not monads.
data ZipSerialM m a Source #
The applicative instance of ZipSerialM
zips a number of streams serially
i.e. it produces one element from each stream serially and then zips all
those elements.
main = (toList . zipSerially
$ (,,) <$> s1 <*> s2 <*> s3) >>= print
where s1 = fromFoldable [1, 2]
s2 = fromFoldable [3, 4]
s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]
The Semigroup
instance of this type works the same way as that of
SerialT
.
Since: 0.2.0
Instances
Like ZipSerialM
but zips in parallel, it generates all the elements to
be zipped concurrently.
main = (toList . zipAsyncly
$ (,,) <$> s1 <*> s2 <*> s3) >>= print
where s1 = fromFoldable [1, 2]
s2 = fromFoldable [3, 4]
s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]
The Semigroup
instance of this type works the same way as that of
SerialT
.
Since: 0.2.0
Running Streams
runStream :: Monad m => SerialT m a -> m () Source #
Run a stream, discarding the results. By default it interprets the stream
as SerialT
, to run other types of streams use the type adapting
combinators for example runStream .
.asyncly
Since: 0.2.0
Parallel Function Application
Stream processing functions can be composed in a chain using function
application with or without the $
operator, or with reverse function
application operator &
. Streamly provides concurrent versions of these
operators applying stream processing functions such that each stage of the
stream can run in parallel. The operators start with a |
; we can read |$
as "parallel dollar
" to remember that |
comes before $
.
Imports for the code snippets below:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel function application operator for streams; just like the regular
function application operator $
except that it is concurrent. The
following code prints a value every second even though each stage adds a 1
second delay.
runStream $ S.mapM (\x -> threadDelay 1000000 >> print x) |$ S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
Parallel reverse function application operator for streams; just like the
regular reverse function application operator &
except that it is
concurrent.
runStream $ S.repeatM (threadDelay 1000000 >> return 1) |& S.mapM (\x -> threadDelay 1000000 >> print x)
Concurrent
Since: 0.3.0
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel function application operator; applies a run
or fold
function
to a stream such that the fold consumer and the stream producer run in
parallel. A run
or fold
function reduces the stream to a value in the
underlying monad. The .
at the end of the operator is a mnemonic for
termination of the stream.
S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () |$. S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
Parallel reverse function application operator for applying a run or fold
functions to a stream. Just like |$.
except that the operands are reversed.
S.repeatM (threadDelay 1000000 >> return 1) |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
Concurrent
Since: 0.3.0
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #
Make a stream asynchronous, triggers the computation and returns a stream
in the underlying monad representing the output generated by the original
computation. The returned action is exhaustible and must be drained once. If
not drained fully we may have a thread blocked forever and once exhausted it
will always return empty
.
Since: 0.2.0
Merging Streams
The Semigroup
operation <>
of each stream type combines two streams in a
type specific manner. This section provides polymorphic versions of <>
which can be used to combine two streams in a predetermined way irrespective
of the type.
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Concurrency Control
These combinators can be used at any point in a stream composition to set
parameters to control the concurrency of the enclosed stream. A parameter
set at any point remains effective for any concurrent combinators used
downstream until it is reset. These control parameters have no effect on
non-concurrent combinators in the stream, or on non-concurrent streams. They
also do not affect Parallel
streams, as concurrency for Parallel
streams
is always unbounded.
maxThreads :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
Since: 0.4.0
maxBuffer :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
Since: 0.4.0
Rate Limiting
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
Since: 0.5.0
rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #
Specify the pull rate of a stream.
A Nothing
value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of Rate
specifications is documented under Rate
. The
effective maximum production rate achieved by a stream is governed by:
- The
maxThreads
limit - The
maxBuffer
limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Since: 0.5.0
avgRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz
). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
Since: 0.5.0
minRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
Since: 0.5.0
maxRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
Since: 0.5.0
constRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Since: 0.5.0
Folding Containers of Streams
These are variants of standard Foldable
fold functions that use a
polymorphic stream sum operation (e.g. async
or wSerial
) to fold a
container of streams.
foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like foldMapWith
but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Since: 0.1.0
Stream Type Adapters
You may want to use different stream composition styles at different points
in your program. Stream types can be freely converted or adapted from one
type to another. The IsStream
type class facilitates type conversion of
one stream type to another. It is not used directly, instead the type
combinators provided below are used for conversions.
To adapt from one monomorphic type (e.g. AsyncT
) to another monomorphic
type (e.g. SerialT
) use the adapt
combinator. To give a polymorphic code
a specific interpretation or to adapt a specific type to a polymorphic type
use the type specific combinators e.g. asyncly
or wSerially
. You
cannot adapt polymorphic code to polymorphic code, as the compiler would not know
which specific type you are converting from or to. If you see a an
ambiguous type variable
error then most likely you are using adapt
unnecessarily on polymorphic code.
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: 0.2.0
Instances
IsStream WSerialT Source # | |
Defined in Streamly.Streams.Serial | |
IsStream SerialT Source # | |
Defined in Streamly.Streams.Serial | |
IsStream ParallelT Source # | |
Defined in Streamly.Streams.Parallel | |
IsStream WAsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream AheadT Source # | |
Defined in Streamly.Streams.Ahead | |
IsStream ZipAsyncM Source # | |
Defined in Streamly.Streams.Zip | |
IsStream ZipSerialM Source # | |
Defined in Streamly.Streams.Zip toStream :: ZipSerialM m a -> Stream m a fromStream :: Stream m a -> ZipSerialM m a consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # |
serially :: IsStream t => SerialT m a -> t m a Source #
Fix the type of a polymorphic stream as SerialT
.
Since: 0.1.0
wSerially :: IsStream t => WSerialT m a -> t m a Source #
Fix the type of a polymorphic stream as WSerialT
.
Since: 0.2.0
asyncly :: IsStream t => AsyncT m a -> t m a Source #
Fix the type of a polymorphic stream as AsyncT
.
Since: 0.1.0
aheadly :: IsStream t => AheadT m a -> t m a Source #
Fix the type of a polymorphic stream as AheadT
.
Since: 0.3.0
wAsyncly :: IsStream t => WAsyncT m a -> t m a Source #
Fix the type of a polymorphic stream as WAsyncT
.
Since: 0.2.0
parallely :: IsStream t => ParallelT m a -> t m a Source #
Fix the type of a polymorphic stream as ParallelT
.
Since: 0.1.0
zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #
Fix the type of a polymorphic stream as ZipSerialM
.
Since: 0.2.0
zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #
Fix the type of a polymorphic stream as ZipAsyncM
.
Since: 0.2.0
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0
IO Streams
type Serial = SerialT IO Source #
A serial IO stream of elements of type a
. See SerialT
documentation
for more details.
Since: 0.2.0
type WSerial = WSerialT IO Source #
An interleaving serial IO stream of elements of type a
. See WSerialT
documentation for more details.
Since: 0.2.0
type Ahead = AheadT IO Source #
A serial IO stream of elements of type a
with concurrent lookahead. See
AheadT
documentation for more details.
Since: 0.3.0
type Async = AsyncT IO Source #
A demand driven left biased parallely composing IO stream of elements of
type a
. See AsyncT
documentation for more details.
Since: 0.2.0
type WAsync = WAsyncT IO Source #
A round robin parallely composing IO stream of elements of type a
.
See WAsyncT
documentation for more details.
Since: 0.2.0
type Parallel = ParallelT IO Source #
A parallely composing IO stream of elements of type a
.
See ParallelT
documentation for more details.
Since: 0.2.0
type ZipSerial = ZipSerialM IO Source #
An IO stream whose applicative instance zips streams serially.
Since: 0.2.0
type ZipAsync = ZipAsyncM IO Source #
An IO stream whose applicative instance zips streams wAsyncly.
Since: 0.2.0
Re-exports
The class of semigroups (types with an associative binary operation).
Instances should satisfy the associativity law:
Since: base-4.9.0.0
(<>) :: a -> a -> a infixr 6 #
An associative operation.
Reduce a non-empty list with <>
The default definition should be sufficient, but this can be overridden for efficiency.
stimes :: Integral b => b -> a -> a #
Repeat a value n
times.
Given that this works on a Semigroup
it is allowed to fail if
you request 0 or fewer repetitions, and the default definition
will do so.
By making this a member of the class, idempotent semigroups
and monoids can upgrade this to execute in O(1) by
picking stimes =
or stimesIdempotent
stimes =
respectively.stimesIdempotentMonoid
Instances
Semigroup Ordering | Since: base-4.9.0.0 |
Semigroup () | Since: base-4.9.0.0 |
Semigroup Void | Since: base-4.9.0.0 |
Semigroup All | Since: base-4.9.0.0 |
Semigroup Any | Since: base-4.9.0.0 |
Semigroup ByteArray | |
Semigroup [a] | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Maybe a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (IO a) | Since: base-4.10.0.0 |
Semigroup p => Semigroup (Par1 p) | Since: base-4.12.0.0 |
Ord a => Semigroup (Min a) | Since: base-4.9.0.0 |
Ord a => Semigroup (Max a) | Since: base-4.9.0.0 |
Semigroup (First a) | Since: base-4.9.0.0 |
Semigroup (Last a) | Since: base-4.9.0.0 |
Monoid m => Semigroup (WrappedMonoid m) | Since: base-4.9.0.0 |
Defined in Data.Semigroup (<>) :: WrappedMonoid m -> WrappedMonoid m -> WrappedMonoid m # sconcat :: NonEmpty (WrappedMonoid m) -> WrappedMonoid m # stimes :: Integral b => b -> WrappedMonoid m -> WrappedMonoid m # | |
Semigroup a => Semigroup (Option a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Identity a) | Since: base-4.9.0.0 |
Semigroup (First a) | Since: base-4.9.0.0 |
Semigroup (Last a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Dual a) | Since: base-4.9.0.0 |
Semigroup (Endo a) | Since: base-4.9.0.0 |
Num a => Semigroup (Sum a) | Since: base-4.9.0.0 |
Num a => Semigroup (Product a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Down a) | Since: base-4.11.0.0 |
Semigroup (NonEmpty a) | Since: base-4.9.0.0 |
Ord a => Semigroup (Set a) | Since: containers-0.5.7 |
Semigroup (Heap a) | |
Semigroup (Array a) | Since: primitive-0.6.3.0 |
Semigroup (MergeSet a) | |
Semigroup b => Semigroup (a -> b) | Since: base-4.9.0.0 |
Semigroup (Either a b) | Since: base-4.9.0.0 |
Semigroup (V1 p) | Since: base-4.12.0.0 |
Semigroup (U1 p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b) => Semigroup (a, b) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (ST s a) | Since: base-4.11.0.0 |
Semigroup (Proxy s) | Since: base-4.9.0.0 |
Semigroup (WSerialT m a) Source # | |
Semigroup (SerialT m a) Source # | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Semigroup (AheadT m a) Source # | |
Semigroup (ZipAsyncM m a) Source # | |
Semigroup (ZipSerialM m a) Source # | |
Defined in Streamly.Streams.Zip (<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a # sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a # stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a # | |
Semigroup (f p) => Semigroup (Rec1 f p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Const a b) | Since: base-4.9.0.0 |
(Applicative f, Semigroup a) => Semigroup (Ap f a) | Since: base-4.12.0.0 |
Alternative f => Semigroup (Alt f a) | Since: base-4.9.0.0 |
Semigroup c => Semigroup (K1 i c p) | Since: base-4.12.0.0 |
(Semigroup (f p), Semigroup (g p)) => Semigroup ((f :*: g) p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d) | Since: base-4.9.0.0 |
Semigroup (f p) => Semigroup (M1 i c f p) | Since: base-4.12.0.0 |
Semigroup (f (g p)) => Semigroup ((f :.: g) p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e) | Since: base-4.9.0.0 |
Deprecated
runStreaming :: (Monad m, IsStream t) => t m a -> m () Source #
Deprecated: Please use runStream instead.
Same as runStream
Since: 0.1.0
runStreamT :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use runStream instead.
Same as runStream
.
Since: 0.1.0
runInterleavedT :: Monad m => WSerialT m a -> m () Source #
Deprecated: Please use 'runStream . interleaving' instead.
Same as runStream . wSerially
.
Since: 0.1.0
runAsyncT :: Monad m => AsyncT m a -> m () Source #
Deprecated: Please use 'runStream . asyncly' instead.
Same as runStream . asyncly
.
Since: 0.1.0
runParallelT :: Monad m => ParallelT m a -> m () Source #
Deprecated: Please use 'runStream . parallely' instead.
Same as runStream . parallely
.
Since: 0.1.0
runZipStream :: Monad m => ZipSerialM m a -> m () Source #
Deprecated: Please use 'runStream . zipSerially instead.
Same as runStream . zipping
.
Since: 0.1.0
runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #
Deprecated: Please use 'runStream . zipAsyncly instead.
Same as runStream . zippingAsync
.
Since: 0.1.0
type InterleavedT = WSerialT Source #
Deprecated: Please use WSerialT
instead.
Since: 0.1.0
type ZipStream = ZipSerialM Source #
Deprecated: Please use ZipSerialM
instead.
Since: 0.1.0
interleaving :: IsStream t => WSerialT m a -> t m a Source #
zipping :: IsStream t => ZipSerialM m a -> t m a Source #
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #