Copyright | (c) 2017 Harendra Kumar |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly is a general purpose programming framework using cocnurrent data
flow programming paradigm. It can be considered as a generalization of
Haskell lists to monadic streaming with concurrent composition capability.
The serial stream type in streamly SerialT m a
is like the list type [a]
parameterized by the monad m
. For example, SerialT IO a
is a moral
equivalent of [a]
in the IO monad. Streams are constructed very much like
lists, except that they use nil
and cons
instead of '[]' and :
.
> import Streamly > import Streamly.Prelude (cons, consM) > import qualified Streamly.Prelude as S > > S.toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Unlike lists, streams can be constructed from monadic effects:
> S.toList
$getLine
`consM`getLine
`consM` S.nil
hello world ["hello","world"]
Streams are processed just like lists, with list like combinators, except that they are monadic and work in a streaming fashion. Here is a simple console echo program example:
> S.drain $ S.repeatM getLine & S.mapM putStrLn
SerialT Identity a
is a moral equivalent of pure lists. Streamly utilizes
fusion for high performance, therefore, we can represent and process strings
as streams of Char
, encode and decode the streams to/from UTF8 and
serialize them to Array Word8
obviating the need for special purpose
libraries like bytestring
and text
.
For more details please see the Streamly.Tutorial module and the examples directory in this package.
Synopsis
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data SerialT m a
- data WSerialT m a
- data AheadT m a
- data AsyncT m a
- data WAsyncT m a
- data ParallelT m a
- data ZipSerialM m a
- data ZipAsyncM m a
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
- serial :: IsStream t => t m a -> t m a -> t m a
- wSerial :: IsStream t => t m a -> t m a -> t m a
- ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- maxThreads :: IsStream t => Int -> t m a -> t m a
- maxBuffer :: IsStream t => Int -> t m a -> t m a
- data Rate = Rate {}
- rate :: IsStream t => Maybe Rate -> t m a -> t m a
- avgRate :: IsStream t => Double -> t m a -> t m a
- minRate :: IsStream t => Double -> t m a -> t m a
- maxRate :: IsStream t => Double -> t m a -> t m a
- constRate :: IsStream t => Double -> t m a -> t m a
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t
- serially :: IsStream t => SerialT m a -> t m a
- wSerially :: IsStream t => WSerialT m a -> t m a
- asyncly :: IsStream t => AsyncT m a -> t m a
- aheadly :: IsStream t => AheadT m a -> t m a
- wAsyncly :: IsStream t => WAsyncT m a -> t m a
- parallely :: IsStream t => ParallelT m a -> t m a
- zipSerially :: IsStream t => ZipSerialM m a -> t m a
- zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- type Serial = SerialT IO
- type WSerial = WSerialT IO
- type Ahead = AheadT IO
- type Async = AsyncT IO
- type WAsync = WAsyncT IO
- type Parallel = ParallelT IO
- type ZipSerial = ZipSerialM IO
- type ZipAsync = ZipAsyncM IO
- foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- class Semigroup a where
- type Streaming = IsStream
- runStream :: Monad m => SerialT m a -> m ()
- runStreaming :: (Monad m, IsStream t) => t m a -> m ()
- runStreamT :: Monad m => SerialT m a -> m ()
- runInterleavedT :: Monad m => WSerialT m a -> m ()
- runAsyncT :: Monad m => AsyncT m a -> m ()
- runParallelT :: Monad m => ParallelT m a -> m ()
- runZipStream :: Monad m => ZipSerialM m a -> m ()
- runZipAsync :: Monad m => ZipAsyncM m a -> m ()
- type StreamT = SerialT
- type InterleavedT = WSerialT
- type ZipStream = ZipSerialM
- interleaving :: IsStream t => WSerialT m a -> t m a
- zipping :: IsStream t => ZipSerialM m a -> t m a
- zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
- (<=>) :: IsStream t => t m a -> t m a -> t m a
- (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
Module Overview
The basic stream type is Serial
, it represents a sequence of IO actions,
and is a Monad
. The type SerialT
is a monad transformer that can
represent a sequence of actions in an arbitrary monad. The type Serial
is
in fact a synonym for SerialT IO
. There are a few more types similar to
SerialT
, all of them represent a stream and differ only in the
Semigroup
, Applicative
and Monad
compositions of the stream. Serial
and WSerial
types compose serially whereas Async
and WAsync
types compose concurrently. All these types can be freely inter-converted
using type combinators without any cost. You can freely switch to any type
of composition at any point in the program. When no type annotation or
explicit stream type combinators are used, the default stream type is
inferred as Serial
.
This module exports stream types, instances and combinators for:
- converting between different stream types
- appending and concurrently merging streams
- Concurrency control
- Concurrent function application
- Stream rate control
This module is designed to be imported unqualified:
import Streamly
See the Streamly.Prelude module for APIs for construction, generation, elimination and transformation of streams.
Type Synonyms
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: 0.1.0
Stream transformers
A stream represents a sequence of pure or effectful actions. The
cons
and consM
operations and the corresponding operators .:
and
|:
can be used to join pure values or effectful actions in a sequence.
The effects in the stream can be executed in many different ways
depending on the type of stream. In other words, the behavior of consM
depends on the type of the stream.
There are three high level categories of streams, spatially ordered
streams, speculative streams and time ordered streams. Spatially
ordered streams, SerialT
and WSerialT
, execute the effects in serial
order i.e. one at a time and present the outputs of those effects to the
consumer in the same order. Speculative streams, AheadT
, may execute
many effects concurrently but present the outputs to the consumer in the
specified spatial order. Time ordered streams, AsyncT
, WAsyncT
and
ParallelT
, may execute many effects concurrently and present the
outputs of those effects to the consumer in time order i.e. as soon as
the output is generated.
We described above how the effects in a sequence are executed for
different types of streams. The behvavior of the Semigroup
and Monad
instances follow the behavior of consM
. Stream generation operations
like repeatM
also execute the effects differently for different
streams, providing a concurrent generation capability when used with
stream types that execute effects concurrently. Similarly, effectful
transformation operations like mapM
also execute the transforming
effects differently for different types of streams.
Serial Streams
When a stream consumer demands an element from a serial stream constructed
as a `consM` b `consM` ... nil
, the action a
at the head of the stream
sequence is executed and the result is supplied to the consumer. When the
next element is demanded, the action b
is executed and its result is
supplied. Thus, the effects are performed and results are consumed strictly
in a serial order. Serial streams can be considered as spatially ordered
streams as the order of execution and consumption is the same as the spatial
order in which the actions are composed by the programmer.
Serial streams enforce the side effects as well as the results of the actions to be in the same order in which the actions are added to the stream. Therefore, the semigroup operation for serial streams is not commutative:
a <> b is not the same as b <> a
There are two serial stream types SerialT
and WSerialT
. The stream
evaluation of both the variants works in the same way as described above,
they differ only in the Semigroup
and Monad
implementaitons.
The Semigroup
operation for SerialT
behaves like a regular append
operation. Therefore, when a <> b
is evaluated, stream a
is evaluated
first until it exhausts and then stream b
is evaluated. In other words,
the elements of stream b
are appended to the elements of stream a
. This
operation can be used to fold an infinite lazy container of streams.
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList . serially
$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]
The Monad
instance runs the monadic continuation for each
element of the stream, serially.
main = S.drain . serially
$ do
x <- return 1 <> return 2
S.yieldM $ print x
1 2
SerialT
nests streams serially in a depth first manner.
main = S.drain . serially
$ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)
(1,3) (1,4) (2,3) (2,4)
We call the monadic code being run for each element of the stream a monadic
continuation. In imperative paradigm we can think of this composition as
nested for
loops and the monadic continuation is the body of the loop. The
loop iterates for all elements of the stream.
Note that the behavior and semantics of SerialT
, including Semigroup
and Monad
instances are exactly like Haskell lists except that SerialT
can contain effectful actions while lists are pure.
In the code above, the serially
combinator can be omitted as the default
stream type is SerialT
.
Since: 0.2.0
Instances
The Semigroup
operation for WSerialT
interleaves the elements from the
two streams. Therefore, when a <> b
is evaluated, stream a
is evaluated
first to produce the first element of the combined stream and then stream
b
is evaluated to produce the next element of the combined stream, and
then we go back to evaluating stream a
and so on. In other words, the
elements of stream a
are interleaved with the elements of stream b
.
Note that when multiple actions are combined like a <> b <> c ... <> z
we
interleave them in a binary fashion i.e. a
and b
are interleaved with
each other and the result is interleaved with c
and so on. This will not
act as a true round-robin scheduling across all the streams. Note that this
operation cannot be used to fold a container of infinite streams as the
state that it needs to maintain is proportional to the number of streams.
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList . wSerially
$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]
Similarly, the Monad
instance interleaves the iterations of the
inner and the outer loop, nesting loops in a breadth first manner.
main = S.drain . wSerially
$ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)
(1,3) (2,3) (1,4) (2,4)
Since: 0.2.0
Instances
Speculative Streams
When a stream consumer demands an element from a speculative stream
constructed as a `consM` b `consM` ... nil
, the action a
at the head
of the stream is executed and the output of the action is supplied to the
consumer. However, in addition to the action at the head multiple actions
following it may also be executed concurrently and the results buffered.
When the next element is demanded it may be served from the buffer and we
may execute the next action in the sequence to keep the buffer adequately
filled. Thus, the actions are executed concurrently but results consumed in
serial order just like serial streams. consM
can be used to fold an
infinite lazy container of effects, as the number of concurrent executions
is limited.
Similar to consM
, the monadic stream generation (e.g. replicateM) and
transformation operations (e.g. mapM) on speculative streams can execute
multiple effects concurrently in a speculative manner.
How many effects can be executed concurrently and how many results can be
buffered are controlled by maxThreads
and maxBuffer
combinators
respectively. The actual number of concurrent threads is adjusted according
to the rate at which the consumer is consuming the stream. It may even
execute actions serially in a single thread if that is enough to match the
consumer's speed.
Speculative streams enforce ordering of the results of actions in the stream but the side effects are only partially ordered. Therefore, the semigroup operation for speculative streams is not commutative from the pure outputs perspective but commutative from side effects perspective.
The Semigroup
operation for AheadT
appends two streams. The combined
stream behaves like a single stream with the actions from the second stream
appended to the first stream. The combined stream is evaluated in the
speculative style. This operation can be used to fold an infinite lazy
container of streams.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main = do xs <- S.toList
.aheadly
$ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) print xs where p n = threadDelay 1000000 >> return n
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream.
The monad instance of AheadT
may run each monadic continuation (bind)
concurrently in a speculative manner, performing side effects in a partially
ordered manner but producing the outputs in an ordered manner like
SerialT
.
main = S.drain . aheadly
$ do
n <- return 3 <> return 2 <> return 1
S.yieldM $ do
threadDelay (n * 1000000)
myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: 0.3.0
Instances
MonadTrans AheadT Source # | |
Defined in Streamly.Streams.Ahead | |
IsStream AheadT Source # | |
Defined in Streamly.Streams.Ahead | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # | |
Defined in Streamly.Streams.Ahead | |
(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # | |
MonadAsync m => Monad (AheadT m) Source # | |
Monad m => Functor (AheadT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AheadT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # | |
Defined in Streamly.Streams.Ahead | |
(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # | |
Defined in Streamly.Streams.Ahead | |
MonadAsync m => Semigroup (AheadT m a) Source # | |
MonadAsync m => Monoid (AheadT m a) Source # | |
Asynchronous Streams
When a stream consumer demands an element from an asynchronous stream,
constructed as a `consM` b `consM` ... nil
, the action a
along with
multiple following at the head of the stream sequence are executed
concurrently and the output of the one that completes first is supplied to
the consumer. As more actions complete, their results are buffered in the
order of completion. When the next element is demanded it may be served
from the buffer and we may initiate execution of more actions in the
sequence to keep the buffer adequately filled. Thus, the actions are
executed concurrently and their results are consumed in the order of
completion. consM
can be used to fold an infinite lazy container of
effects, as the number of concurrent executions is limited.
Similar to consM
, the monadic stream generation (e.g. replicateM) and
transformation operations (e.g. mapM) on asynchronous streams can execute
multiple effects concurrently in an asynchronous manner.
How many effects can be executed concurrently and how many results can be
buffered are controlled by maxThreads
and maxBuffer
combinators
respectively. The actual number of concurrent threads is adjusted according
to the rate at which the consumer is consuming the stream. It may even
execute actions serially in a single thread if that is enough to match the
consumer's speed.
Asynchronous streams do not enforce any spatial order on the side effects or on the results of the actions. However there is a partial ordering as the actions to be executed are picked from the head of stream. The results are presented to the consumer in the completion time order. Therefore, the semigroup operation for asynchronous streams is commutative i.e. the stream is considered unordered.
There are two asynchronous stream types AsyncT
and WAsyncT
. The stream
evaluation of both the variants works in the same way as described above,
they differ only in the Semigroup
and Monad
implementaitons.
The Semigroup
operation for AsyncT
appends two streams. The combined
stream behaves like a single stream with the actions from the second stream
appended to the first stream. The combined stream is evaluated in the
asynchronous style. This operation can be used to fold an infinite lazy
container of streams.
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList . asyncly
$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the monad instance of AsyncT
may run each iteration
concurrently based on demand. More concurrent iterations are started only
if the previous iterations are not able to produce enough output for the
consumer.
main =drain
.asyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: 0.1.0
Instances
MonadTrans AsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Streams.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
MonadAsync m => Monad (AsyncT m) Source # | |
Monad m => Functor (AsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Monoid (AsyncT m a) Source # | |
The Semigroup
operation for WAsyncT
interleaves the elements from the
two streams. Therefore, when a <> b
is evaluated, one action is picked
from stream a
for evaluation and then the next action is picked from
stream b
and then the next action is again picked from stream a
, going
around in a round-robin fashion. Many such actions are executed concurrently
depending on maxThreads
and maxBuffer
limits. Results are served to the
consumer in the order completion of the actions.
Note that when multiple actions are combined like a <> b <> c ... <> z
we
go in a round-robin fasion across all of them picking one action from each
up to z
and then come back to a
. Note that this operation cannot be
used to fold a container of infinite streams as the state that it needs to
maintain is proportional to the number of streams.
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList . wAsyncly
$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of WAsyncT
runs all iterations fairly
concurrently using a round robin scheduling.
main =drain
.wAsyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: 0.2.0
Instances
MonadTrans WAsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream WAsyncT Source # | |
Defined in Streamly.Streams.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
MonadAsync m => Monad (WAsyncT m) Source # | |
Monad m => Functor (WAsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
Defined in Streamly.Streams.Async | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Monoid (WAsyncT m a) Source # | |
Async composition with strict concurrent execution of all streams.
The Semigroup
instance of ParallelT
executes both the streams
concurrently without any delay or without waiting for the consumer demand
and merges the results as they arrive. If the consumer does not consume
the results, they are buffered upto a configured maximum, controlled by the
maxBuffer
primitive. If the buffer becomes full the concurrent tasks will
block until there is space in the buffer.
Both WAsyncT
and ParallelT
, evaluate the constituent streams fairly in a
round robin fashion. The key difference is that WAsyncT
might wait for the
consumer demand before it executes the tasks whereas ParallelT
starts
executing all the tasks immediately without waiting for the consumer demand.
For WAsyncT
the maxThreads
limit applies whereas for ParallelT
it does
not apply. In other words, WAsyncT
can be lazy whereas ParallelT
is
strict.
ParallelT
is useful for cases when the streams are required to be
evaluated simultaneously irrespective of how the consumer consumes them e.g.
when we want to race two tasks and want to start both strictly at the same
time or if we have timers in the parallel tasks and our results depend on
the timers being started at the same time. If we do not have such
requirements then AsyncT
or AheadT
are recommended as they can be more
efficient than ParallelT
.
main = (toList
.parallely
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the Async
style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of ParallelT
runs all iterations
of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =drain
.parallely
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.1.0
Instances
MonadTrans ParallelT Source # | |
Defined in Streamly.Streams.Parallel | |
IsStream ParallelT Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # | |
MonadAsync m => Monad (ParallelT m) Source # | |
Monad m => Functor (ParallelT m) Source # | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # | |
Defined in Streamly.Streams.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Monoid (ParallelT m a) Source # | |
Zipping Streams
ZipSerialM
and ZipAsyncM
, provide Applicative
instances for zipping the
corresponding elements of two streams together. Note that these types are
not monads.
data ZipSerialM m a Source #
The applicative instance of ZipSerialM
zips a number of streams serially
i.e. it produces one element from each stream serially and then zips all
those elements.
main = (toList . zipSerially
$ (,,) <$> s1 <*> s2 <*> s3) >>= print
where s1 = fromFoldable [1, 2]
s2 = fromFoldable [3, 4]
s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]
The Semigroup
instance of this type works the same way as that of
SerialT
.
Since: 0.2.0
Instances
Like ZipSerialM
but zips in parallel, it generates all the elements to
be zipped concurrently.
main = (toList . zipAsyncly
$ (,,) <$> s1 <*> s2 <*> s3) >>= print
where s1 = fromFoldable [1, 2]
s2 = fromFoldable [3, 4]
s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]
The Semigroup
instance of this type works the same way as that of
SerialT
.
Since: 0.2.0
Parallel Function Application
Stream processing functions can be composed in a chain using function
application with or without the $
operator, or with reverse function
application operator &
. Streamly provides concurrent versions of these
operators applying stream processing functions such that each stage of the
stream can run in parallel. The operators start with a |
; we can read |$
as "parallel dollar
" to remember that |
comes before $
.
Imports for the code snippets below:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel function application operator for streams; just like the regular
function application operator $
except that it is concurrent. The
following code prints a value every second even though each stage adds a 1
second delay.
drain $ S.mapM (\x -> threadDelay 1000000 >> print x) |$ S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
Parallel reverse function application operator for streams; just like the
regular reverse function application operator &
except that it is
concurrent.
drain $ S.repeatM (threadDelay 1000000 >> return 1) |& S.mapM (\x -> threadDelay 1000000 >> print x)
Concurrent
Since: 0.3.0
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel function application operator; applies a run
or fold
function
to a stream such that the fold consumer and the stream producer run in
parallel. A run
or fold
function reduces the stream to a value in the
underlying monad. The .
at the end of the operator is a mnemonic for
termination of the stream.
S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () |$. S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
Parallel reverse function application operator for applying a run or fold
functions to a stream. Just like |$.
except that the operands are reversed.
S.repeatM (threadDelay 1000000 >> return 1) |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
Concurrent
Since: 0.3.0
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #
Make a stream asynchronous, triggers the computation and returns a stream
in the underlying monad representing the output generated by the original
computation. The returned action is exhaustible and must be drained once. If
not drained fully we may have a thread blocked forever and once exhausted it
will always return empty
.
Since: 0.2.0
Merging Streams
The Semigroup
operation <>
of each stream type combines two streams in a
type specific manner. This section provides polymorphic versions of <>
which can be used to combine two streams in a predetermined way irrespective
of the type.
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Concurrency Control
These combinators can be used at any point in a stream composition to set parameters to control the concurrency of the argument stream. A control parameter set at any point remains effective for any concurrent combinators used in the argument stream until it is reset by using the combinator again. These control parameters have no effect on non-concurrent combinators in the stream, or on non-concurrent streams.
Pitfall: Remember that maxBuffer
in the following example applies to
mapM
and any other combinators that may follow it, and it does not apply
to the combinators before it:
... $ maxBuffer 10 $ S.mapM ... ...
If we use &
instead of $
the situation will reverse, in the following
example, maxBuffer
does not apply to mapM
, it applies to combinators
that come before it, because those are the arguments to maxBuffer
:
... & maxBuffer 10 & S.mapM ... ...
maxThreads :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. maxThreads
does not affect
ParallelT
streams as they can use unbounded number of threads.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
Since: 0.4.0
maxBuffer :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
Since: 0.4.0
Rate Limiting
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
Since: 0.5.0
rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #
Specify the pull rate of a stream.
A Nothing
value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of Rate
specifications is documented under Rate
. The
effective maximum production rate achieved by a stream is governed by:
- The
maxThreads
limit - The
maxBuffer
limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Since: 0.5.0
avgRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz
). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
Since: 0.5.0
minRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
Since: 0.5.0
maxRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
Since: 0.5.0
constRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Since: 0.5.0
Stream Type Adapters
You may want to use different stream composition styles at different points
in your program. Stream types can be freely converted or adapted from one
type to another. The IsStream
type class facilitates type conversion of
one stream type to another. It is not used directly, instead the type
combinators provided below are used for conversions.
To adapt from one monomorphic type (e.g. AsyncT
) to another monomorphic
type (e.g. SerialT
) use the adapt
combinator. To give a polymorphic code
a specific interpretation or to adapt a specific type to a polymorphic type
use the type specific combinators e.g. asyncly
or wSerially
. You
cannot adapt polymorphic code to polymorphic code, as the compiler would not know
which specific type you are converting from or to. If you see a an
ambiguous type variable
error then most likely you are using adapt
unnecessarily on polymorphic code.
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: 0.2.0
Instances
IsStream WSerialT Source # | |
Defined in Streamly.Streams.Serial | |
IsStream SerialT Source # | |
Defined in Streamly.Streams.Serial | |
IsStream ParallelT Source # | |
Defined in Streamly.Streams.Parallel | |
IsStream WAsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Streams.Async | |
IsStream AheadT Source # | |
Defined in Streamly.Streams.Ahead | |
IsStream ZipAsyncM Source # | |
Defined in Streamly.Streams.Zip | |
IsStream ZipSerialM Source # | |
Defined in Streamly.Streams.Zip toStream :: ZipSerialM m a -> Stream m a fromStream :: Stream m a -> ZipSerialM m a consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # |
serially :: IsStream t => SerialT m a -> t m a Source #
Fix the type of a polymorphic stream as SerialT
.
Since: 0.1.0
wSerially :: IsStream t => WSerialT m a -> t m a Source #
Fix the type of a polymorphic stream as WSerialT
.
Since: 0.2.0
asyncly :: IsStream t => AsyncT m a -> t m a Source #
Fix the type of a polymorphic stream as AsyncT
.
Since: 0.1.0
aheadly :: IsStream t => AheadT m a -> t m a Source #
Fix the type of a polymorphic stream as AheadT
.
Since: 0.3.0
wAsyncly :: IsStream t => WAsyncT m a -> t m a Source #
Fix the type of a polymorphic stream as WAsyncT
.
Since: 0.2.0
parallely :: IsStream t => ParallelT m a -> t m a Source #
Fix the type of a polymorphic stream as ParallelT
.
Since: 0.1.0
zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #
Fix the type of a polymorphic stream as ZipSerialM
.
Since: 0.2.0
zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #
Fix the type of a polymorphic stream as ZipAsyncM
.
Since: 0.2.0
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0
IO Streams
type Serial = SerialT IO Source #
A serial IO stream of elements of type a
. See SerialT
documentation
for more details.
Since: 0.2.0
type WSerial = WSerialT IO Source #
An interleaving serial IO stream of elements of type a
. See WSerialT
documentation for more details.
Since: 0.2.0
type Ahead = AheadT IO Source #
A serial IO stream of elements of type a
with concurrent lookahead. See
AheadT
documentation for more details.
Since: 0.3.0
type Async = AsyncT IO Source #
A demand driven left biased parallely composing IO stream of elements of
type a
. See AsyncT
documentation for more details.
Since: 0.2.0
type WAsync = WAsyncT IO Source #
A round robin parallely composing IO stream of elements of type a
.
See WAsyncT
documentation for more details.
Since: 0.2.0
type Parallel = ParallelT IO Source #
A parallely composing IO stream of elements of type a
.
See ParallelT
documentation for more details.
Since: 0.2.0
type ZipSerial = ZipSerialM IO Source #
An IO stream whose applicative instance zips streams serially.
Since: 0.2.0
type ZipAsync = ZipAsyncM IO Source #
An IO stream whose applicative instance zips streams wAsyncly.
Since: 0.2.0
Folding Containers of Streams
These are variants of standard Foldable
fold functions that use a
polymorphic stream sum operation (e.g. async
or wSerial
) to fold a
finite container of streams. Note that these are just special cases of
the more general concatMapWith
operation.
foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like foldMapWith
but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Equivalent to:
forEachWith = flip S.foldMapWith
Since: 0.1.0 (Streamly)
Re-exports
The class of semigroups (types with an associative binary operation).
Instances should satisfy the associativity law:
Since: base-4.9.0.0
(<>) :: a -> a -> a infixr 6 #
An associative operation.
Reduce a non-empty list with <>
The default definition should be sufficient, but this can be overridden for efficiency.
stimes :: Integral b => b -> a -> a #
Repeat a value n
times.
Given that this works on a Semigroup
it is allowed to fail if
you request 0 or fewer repetitions, and the default definition
will do so.
By making this a member of the class, idempotent semigroups
and monoids can upgrade this to execute in O(1) by
picking stimes =
or stimesIdempotent
stimes =
respectively.stimesIdempotentMonoid
Instances
Semigroup Ordering | Since: base-4.9.0.0 |
Semigroup () | Since: base-4.9.0.0 |
Semigroup Void | Since: base-4.9.0.0 |
Semigroup All | Since: base-4.9.0.0 |
Semigroup Any | Since: base-4.9.0.0 |
Semigroup ByteArray | |
Semigroup [a] | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Maybe a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (IO a) | Since: base-4.10.0.0 |
Semigroup p => Semigroup (Par1 p) | Since: base-4.12.0.0 |
Ord a => Semigroup (Min a) | Since: base-4.9.0.0 |
Ord a => Semigroup (Max a) | Since: base-4.9.0.0 |
Semigroup (First a) | Since: base-4.9.0.0 |
Semigroup (Last a) | Since: base-4.9.0.0 |
Monoid m => Semigroup (WrappedMonoid m) | Since: base-4.9.0.0 |
Defined in Data.Semigroup (<>) :: WrappedMonoid m -> WrappedMonoid m -> WrappedMonoid m # sconcat :: NonEmpty (WrappedMonoid m) -> WrappedMonoid m # stimes :: Integral b => b -> WrappedMonoid m -> WrappedMonoid m # | |
Semigroup a => Semigroup (Option a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Identity a) | Since: base-4.9.0.0 |
Semigroup (First a) | Since: base-4.9.0.0 |
Semigroup (Last a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Dual a) | Since: base-4.9.0.0 |
Semigroup (Endo a) | Since: base-4.9.0.0 |
Num a => Semigroup (Sum a) | Since: base-4.9.0.0 |
Num a => Semigroup (Product a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Down a) | Since: base-4.11.0.0 |
Semigroup (NonEmpty a) | Since: base-4.9.0.0 |
Ord a => Semigroup (Set a) | Since: containers-0.5.7 |
Semigroup (Heap a) | |
Semigroup (Array a) | Since: primitive-0.6.3.0 |
Semigroup (MergeSet a) | |
Storable a => Semigroup (Array a) Source # | |
Semigroup b => Semigroup (a -> b) | Since: base-4.9.0.0 |
Semigroup (Either a b) | Since: base-4.9.0.0 |
Semigroup (V1 p) | Since: base-4.12.0.0 |
Semigroup (U1 p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b) => Semigroup (a, b) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (ST s a) | Since: base-4.11.0.0 |
Semigroup (Proxy s) | Since: base-4.9.0.0 |
Ord k => Semigroup (Map k v) | |
Semigroup (WSerialT m a) Source # | |
Semigroup (SerialT m a) Source # | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Semigroup (AheadT m a) Source # | |
Semigroup (ZipAsyncM m a) Source # | |
Semigroup (ZipSerialM m a) Source # | |
Defined in Streamly.Streams.Zip (<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a # sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a # stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a # | |
Semigroup (f p) => Semigroup (Rec1 f p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Const a b) | Since: base-4.9.0.0 |
(Applicative f, Semigroup a) => Semigroup (Ap f a) | Since: base-4.12.0.0 |
Alternative f => Semigroup (Alt f a) | Since: base-4.9.0.0 |
(Semigroup b, Monad m) => Semigroup (Fold m a b) Source # | Combines the outputs of the folds (the type |
Semigroup c => Semigroup (K1 i c p) | Since: base-4.12.0.0 |
(Semigroup (f p), Semigroup (g p)) => Semigroup ((f :*: g) p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d) | Since: base-4.9.0.0 |
Semigroup (f p) => Semigroup (M1 i c f p) | Since: base-4.12.0.0 |
Semigroup (f (g p)) => Semigroup ((f :.: g) p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e) | Since: base-4.9.0.0 |
Deprecated
runStream :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use Streamly.Prelude.drain instead.
Same as "Streamly.Prelude.runStream".
runStreaming :: (Monad m, IsStream t) => t m a -> m () Source #
runStreamT :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use runStream instead.
Same as runStream
.
Since: 0.1.0
runInterleavedT :: Monad m => WSerialT m a -> m () Source #
Deprecated: Please use 'runStream . interleaving' instead.
Same as runStream . wSerially
.
Since: 0.1.0
runAsyncT :: Monad m => AsyncT m a -> m () Source #
Deprecated: Please use 'runStream . asyncly' instead.
Same as runStream . asyncly
.
Since: 0.1.0
runParallelT :: Monad m => ParallelT m a -> m () Source #
Deprecated: Please use 'runStream . parallely' instead.
Same as runStream . parallely
.
Since: 0.1.0
runZipStream :: Monad m => ZipSerialM m a -> m () Source #
Deprecated: Please use 'runStream . zipSerially instead.
Same as runStream . zipping
.
Since: 0.1.0
runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #
Deprecated: Please use 'runStream . zipAsyncly instead.
Same as runStream . zippingAsync
.
Since: 0.1.0
type InterleavedT = WSerialT Source #
Deprecated: Please use WSerialT
instead.
Since: 0.1.0
type ZipStream = ZipSerialM Source #
Deprecated: Please use ZipSerialM
instead.
Since: 0.1.0
interleaving :: IsStream t => WSerialT m a -> t m a Source #
zipping :: IsStream t => ZipSerialM m a -> t m a Source #
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #