streamly-0.2.0: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerharendra.kumar@gmail.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly

Contents

Description

The way a list represents a sequence of pure values, a stream represents a sequence of monadic actions. The monadic stream API offered by Streamly is very close to the Haskell Prelude pure lists' API, it can be considered as a natural extension of lists to monadic actions. Streamly streams provide concurrent composition and merging of streams. It can be considered as a concurrent list transformer. In contrast to the Prelude lists, merging or appending streams of arbitrary length is scalable and inexpensive.

The basic stream type is Serial, it represents a sequence of IO actions, and is a Monad. The type SerialT is a monad transformer that can represent a sequence of actions in an arbitrary monad. The type Serial is in fact a synonym for SerialT IO. There are a few more types similar to SerialT, all of them represent a stream and differ only in the Semigroup, Applicative and Monad compositions of the stream. Serial and WSerial types compose serially whereas Async and WAsync types compose concurrently. All these types can be freely inter-converted using type combinators without any cost. You can freely switch to any type of composition at any point in the program. When no type annotation or explicit stream type combinators are used, the default stream type is inferred as Serial.

Here is a simple console echo program example:

> runStream $ S.repeatM getLine & S.mapM putStrLn

For more details please see the Streamly.Tutorial module and the examples directory in this package.

This module exports stream types, instances and some basic operations. Functionality exported by this module include:

  • Semigroup append (<>) instances as well as explicit operations for merging streams
  • Monad and Applicative instances for looping over streams
  • Zip Applicatives for zipping streams
  • Stream type combinators to convert between different composition styles
  • Some basic utilities to run and fold streams

See the Streamly.Prelude module for comprehensive APIs for construction, generation, elimination and transformation of streams.

This module is designed to be imported unqualified:

import Streamly

Synopsis

Documentation

type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: 0.1.0

Stream transformers

Serial Streams

Serial streams compose serially or non-concurrently. In a composed stream, each action is executed only after the prvious action has finished. The two serial stream types SerialT and WSerialT differ in how they traverse the streams in a Semigroup or Monad composition.

data SerialT m a Source #

Deep serial composition or serial composition with depth first traversal. The Semigroup instance of SerialT appends two streams serially in a depth first manner, yielding all elements from the first stream, and then all elements from the second stream.

main = (toList . serially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

The Monad instance runs the monadic continuation for each element of the stream, serially.

main = runStream . serially $ do
    x <- return 1 <> return 2
    liftIO $ print x
1
2

SerialT nests streams serially in a depth first manner.

main = runStream . serially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    liftIO $ print (x, y)
(1,3)
(1,4)
(2,3)
(2,4)

This behavior of SerialT is exactly like a list transformer. We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested for loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.

The serially combinator can be omitted as the default stream type is SerialT. Note that serial composition with depth first traversal can be used to combine an infinite number of streams as it explores only one stream at a time.

Since: 0.2.0

Instances

MonadTrans SerialT Source # 

Methods

lift :: Monad m => m a -> SerialT m a #

IsStream SerialT Source # 

Methods

toStream :: SerialT m a -> Stream m a

fromStream :: Stream m a -> SerialT m a

(MonadBase b m, Monad m) => MonadBase b (SerialT m) Source # 

Methods

liftBase :: b α -> SerialT m α #

MonadState s m => MonadState s (SerialT m) Source # 

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) Source # 

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

Monad m => Monad (SerialT m) Source # 

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

fail :: String -> SerialT m a #

Monad m => Functor (SerialT m) Source # 

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Applicative (SerialT m) Source # 

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

MonadIO m => MonadIO (SerialT m) Source # 

Methods

liftIO :: IO a -> SerialT m a #

MonadThrow m => MonadThrow (SerialT m) Source # 

Methods

throwM :: Exception e => e -> SerialT m a #

Semigroup (SerialT m a) Source # 

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

Monoid (SerialT m a) Source # 

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

data WSerialT m a Source #

Wide serial composition or serial composition with a breadth first traversal. The Semigroup instance of WSerialT traverses the two streams in a breadth first manner. In other words, it interleaves two streams, yielding one element from each stream alternately.

main = (toList . wSerially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

Similarly, the Monad instance interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner.

main = runStream . wSerially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    liftIO $ print (x, y)
(1,3)
(2,3)
(1,4)
(2,4)

Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.2.0

Instances

MonadTrans WSerialT Source # 

Methods

lift :: Monad m => m a -> WSerialT m a #

IsStream WSerialT Source # 

Methods

toStream :: WSerialT m a -> Stream m a

fromStream :: Stream m a -> WSerialT m a

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) Source # 

Methods

liftBase :: b α -> WSerialT m α #

MonadState s m => MonadState s (WSerialT m) Source # 

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) Source # 

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

Monad m => Monad (WSerialT m) Source # 

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

fail :: String -> WSerialT m a #

Monad m => Functor (WSerialT m) Source # 

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Applicative (WSerialT m) Source # 

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

MonadIO m => MonadIO (WSerialT m) Source # 

Methods

liftIO :: IO a -> WSerialT m a #

MonadThrow m => MonadThrow (WSerialT m) Source # 

Methods

throwM :: Exception e => e -> WSerialT m a #

Semigroup (WSerialT m a) Source # 

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Monoid (WSerialT m a) Source # 

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

Parallel Streams

The async style streams execute actions asynchronously and consume the outputs as well asynchronously. In a composed stream, at any point of time more than one stream can run concurrently and yield elements. The elements are yielded by the composed stream as they are generated by the constituent streams on a first come first serve basis. Therefore, on each run the stream may yield elements in a different sequence depending on the delays introduced by scheduling. The two async types AsyncT and WAsyncT differ in how they traverse streams in Semigroup or Monad compositions.

data AsyncT m a Source #

Deep async composition or async composition with depth first traversal. In a left to right Semigroup composition it tries to yield elements from the left stream as long as it can, but it can run the right stream in parallel if it needs to, based on demand. The right stream can be run if the left stream blocks on IO or cannot produce elements fast enough for the consumer.

main = (toList . asyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the monad instance of AsyncT may run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer.

import Streamly
import Control.Concurrent

main = runStream . asyncly $ do
    n <- return 3 <> return 2 <> return 1
    liftIO $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

All iterations may run in the same thread if they do not block.

Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.

Since: 0.1.0

Instances

MonadTrans AsyncT Source # 

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT Source # 

Methods

toStream :: AsyncT m a -> Stream m a

fromStream :: Stream m a -> AsyncT m a

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

fail :: String -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

data WAsyncT m a Source #

Wide async composition or async composition with breadth first traversal. The Semigroup instance of WAsyncT concurrently traverses the composed streams using a depth first travesal or in a round robin fashion, yielding elements from both streams alternately.

main = (toList . wAsyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of WAsyncT runs all iterations fairly concurrently using a round robin scheduling.

import Streamly
import Control.Concurrent

main = runStream . wAsyncly $ do
    n <- return 3 <> return 2 <> return 1
    liftIO $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Unlike AsyncT all iterations are guaranteed to run fairly concurrently, unconditionally.

Note that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.2.0

Instances

MonadTrans WAsyncT Source # 

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT Source # 

Methods

toStream :: WAsyncT m a -> Stream m a

fromStream :: Stream m a -> WAsyncT m a

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

fail :: String -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

data ParallelT m a Source #

Async composition with simultaneous traversal of all streams.

The Semigroup instance of ParallelT concurrently merges two streams, running both strictly concurrently and yielding elements from both streams as they arrive. When multiple streams are combined using ParallelT each one is evaluated in its own thread and the results produced are presented in the combined stream on a first come first serve basis.

AsyncT and WAsyncT are concurrent lookahead streams each with a specific type of consumption pattern (depth first or breadth first). Since they are lookahead, they may introduce certain default latency in starting more concurrent tasks for efficiency reasons or may put a default limitation on the resource consumption (e.g. number of concurrent threads for lookahead). If we look at the implementation detail, they both can share a pool of worker threads to evaluate the streams in the desired pattern and at the desired rate. However, ParallelT uses a separate runtime thread to evaluate each stream.

WAsyncT is similar to ParallelT, as both of them evaluate the constituent streams fairly in a round robin fashion. However, the key difference is that WAsyncT is lazy or pull driven whereas ParallelT is strict or push driven. ParallelT immediately starts concurrent evaluation of both the streams (in separate threads) and later picks the results whereas WAsyncT may wait for a certain latency threshold before initiating concurrent evaluation of the next stream. The concurrent scheduling of the next stream or the degree of concurrency is driven by the feedback from the consumer. In case of ParallelT each stream is evaluated in a separate thread and results are pushed to a shared output buffer, the evaluation rate is controlled by blocking when the buffer is full.

Concurrent lookahead streams are generally more efficient than ParallelT and can work pretty efficiently even for smaller tasks because they do not necessarily use a separate thread for each task. So they should be preferred over ParallelT especially when efficiency is a concern and simultaneous strict evaluation is not a requirement. ParallelT is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. We can say that ParallelT is almost the same (modulo some implementation differences) as WAsyncT when the latter is used with unlimited lookahead and zero latency in initiating lookahead.

main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

When streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the Async style streams.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of ParallelT runs all iterations of the loop concurrently.

import Streamly
import Control.Concurrent

main = runStream . parallely $ do
    n <- return 3 <> return 2 <> return 1
    liftIO $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.1.0

Instances

MonadTrans ParallelT Source # 

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT Source # 

Methods

toStream :: ParallelT m a -> Stream m a

fromStream :: Stream m a -> ParallelT m a

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # 

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

fail :: String -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

Zipping Streams

ZipSerialM and ZipAsyncM, provide Applicative instances for zipping the corresponding elements of two streams together. Note that these types are not monads.

data ZipSerialM m a Source #

The applicative instance of ZipSerialM zips a number of streams serially i.e. it produces one element from each stream serially and then zips all those elements.

main = (toList . zipSerially $ (,,) <$> s1 <*> s2 <*> s3) >>= print
    where s1 = fromFoldable [1, 2]
          s2 = fromFoldable [3, 4]
          s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]

The Semigroup instance of this type works the same way as that of SerialT.

Since: 0.2.0

Instances

IsStream ZipSerialM Source # 

Methods

toStream :: ZipSerialM m a -> Stream m a

fromStream :: Stream m a -> ZipSerialM m a

Monad m => Functor (ZipSerialM m) Source # 

Methods

fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

(<$) :: a -> ZipSerialM m b -> ZipSerialM m a #

Monad m => Applicative (ZipSerialM m) Source # 

Methods

pure :: a -> ZipSerialM m a #

(<*>) :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

liftA2 :: (a -> b -> c) -> ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m c #

(*>) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m b #

(<*) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m a #

Semigroup (ZipSerialM m a) Source # 

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

Monoid (ZipSerialM m a) Source # 

Methods

mempty :: ZipSerialM m a #

mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

mconcat :: [ZipSerialM m a] -> ZipSerialM m a #

data ZipAsyncM m a Source #

Like ZipSerialM but zips in parallel, it generates all the elements to be zipped concurrently.

main = (toList . zipAsyncly $ (,,) <$> s1 <*> s2 <*> s3) >>= print
    where s1 = fromFoldable [1, 2]
          s2 = fromFoldable [3, 4]
          s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]

The Semigroup instance of this type works the same way as that of SerialT.

Since: 0.2.0

Instances

IsStream ZipAsyncM Source # 

Methods

toStream :: ZipAsyncM m a -> Stream m a

fromStream :: Stream m a -> ZipAsyncM m a

Monad m => Functor (ZipAsyncM m) Source # 

Methods

fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

(<$) :: a -> ZipAsyncM m b -> ZipAsyncM m a #

MonadAsync m => Applicative (ZipAsyncM m) Source # 

Methods

pure :: a -> ZipAsyncM m a #

(<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c #

(*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b #

(<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a #

Semigroup (ZipAsyncM m a) Source # 

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Monoid (ZipAsyncM m a) Source # 

Methods

mempty :: ZipAsyncM m a #

mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a #

Polymorphic Sum Operations

The Semigroup operation <> of each stream type combines two streams in a type specific manner. This section provides polymorphic versions of <> which can be used to combine two streams in a predetermined way irrespective of the type.

serial :: IsStream t => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of SerialT. Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

Since: 0.2.0

wSerial :: IsStream t => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of WSerialT. Interleaves two streams, yielding one element from each stream alternately.

Since: 0.2.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of AsyncT. Merges two streams possibly concurrently, preferring the elements from the left one when available.

Since: 0.2.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of WAsyncT. Merges two streams concurrently choosing elements from both fairly.

Since: 0.2.0

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of ParallelT Merges two streams concurrently.

Since: 0.2.0

Stream Type Adapters

You may want to use different stream composition styles at different points in your program. Stream types can be freely converted or adapted from one type to another. The IsStream type class facilitates type conversion of one stream type to another. It is not used directly, instead the type combinators provided below are used for conversions.

To adapt from one monomorphic type (e.g. AsyncT) to another monomorphic type (e.g. SerialT) use the adapt combinator. To give a polymorphic code a specific interpretation or to adapt a specific type to a polymorphic type use the type specific combinators e.g. asyncly or wSerially. You cannot adapt polymorphic code to polymorphic code, as the compiler would not know which specific type you are converting from or to. If you see a an ambiguous type variable error then most likely you are using adapt unnecessarily on polymorphic code.

class IsStream t Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0

Minimal complete definition

toStream, fromStream

Instances

IsStream ZipAsyncM Source # 

Methods

toStream :: ZipAsyncM m a -> Stream m a

fromStream :: Stream m a -> ZipAsyncM m a

IsStream ZipSerialM Source # 

Methods

toStream :: ZipSerialM m a -> Stream m a

fromStream :: Stream m a -> ZipSerialM m a

IsStream ParallelT Source # 

Methods

toStream :: ParallelT m a -> Stream m a

fromStream :: Stream m a -> ParallelT m a

IsStream WAsyncT Source # 

Methods

toStream :: WAsyncT m a -> Stream m a

fromStream :: Stream m a -> WAsyncT m a

IsStream AsyncT Source # 

Methods

toStream :: AsyncT m a -> Stream m a

fromStream :: Stream m a -> AsyncT m a

IsStream WSerialT Source # 

Methods

toStream :: WSerialT m a -> Stream m a

fromStream :: Stream m a -> WSerialT m a

IsStream SerialT Source # 

Methods

toStream :: SerialT m a -> Stream m a

fromStream :: Stream m a -> SerialT m a

serially :: IsStream t => SerialT m a -> t m a Source #

Fix the type of a polymorphic stream as SerialT.

Since: 0.1.0

wSerially :: IsStream t => WSerialT m a -> t m a Source #

Fix the type of a polymorphic stream as WSerialT.

Since: 0.2.0

asyncly :: IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0

wAsyncly :: IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0

parallely :: IsStream t => ParallelT m a -> t m a Source #

Fix the type of a polymorphic stream as ParallelT.

Since: 0.1.0

zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipSerialM.

Since: 0.2.0

zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipAsyncM.

Since: 0.2.0

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0

IO Streams

type Serial a = SerialT IO a Source #

A serial IO stream of elements of type a. See SerialT documentation for more details.

Since: 0.2.0

type WSerial a = WSerialT IO a Source #

An interleaving serial IO stream of elements of type a. See WSerialT documentation for more details.

Since: 0.2.0

type Async a = AsyncT IO a Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0

type WAsync a = WAsyncT IO a Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0

type Parallel a = ParallelT IO a Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0

type ZipSerial a = ZipSerialM IO a Source #

An IO stream whose applicative instance zips streams serially.

Since: 0.2.0

type ZipAsync a = ZipAsyncM IO a Source #

An IO stream whose applicative instance zips streams wAsyncly.

Since: 0.2.0

Running Streams

runStream :: Monad m => SerialT m a -> m () Source #

Run a streaming composition, discard the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example runStream . asyncly.

Since: 0.2.0

Transformation

mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #

Make a stream asynchronous, triggers the computation and returns a stream in the underlying monad representing the output generated by the original computation. The returned action is exhaustible and must be drained once. If not drained fully we may have a thread blocked forever and once exhausted it will always return empty.

Since: 0.2.0

Polymorphic Fold Utilities

These are variants of standard Foldable fold functions that use a polymorphic stream sum operation (e.g. async or wSerial) to fold a container of streams.

foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

foldWith async $ map return [1..3]

Since: 0.1.0

foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream sum operation.

foldMapWith async return [1..3]

Since: 0.1.0

forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like foldMapWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Since: 0.1.0

Re-exports

class Semigroup a where #

The class of semigroups (types with an associative binary operation).

Since: 4.9.0.0

Methods

(<>) :: a -> a -> a infixr 6 #

An associative operation.

(a <> b) <> c = a <> (b <> c)

If a is also a Monoid we further require

(<>) = mappend

sconcat :: NonEmpty a -> a #

Reduce a non-empty list with <>

The default definition should be sufficient, but this can be overridden for efficiency.

stimes :: Integral b => b -> a -> a #

Repeat a value n times.

Given that this works on a Semigroup it is allowed to fail if you request 0 or fewer repetitions, and the default definition will do so.

By making this a member of the class, idempotent semigroups and monoids can upgrade this to execute in O(1) by picking stimes = stimesIdempotent or stimes = stimesIdempotentMonoid respectively.

Instances

Semigroup Ordering

Since: 4.9.0.0

Semigroup ()

Since: 4.9.0.0

Methods

(<>) :: () -> () -> () #

sconcat :: NonEmpty () -> () #

stimes :: Integral b => b -> () -> () #

Semigroup Void

Since: 4.9.0.0

Methods

(<>) :: Void -> Void -> Void #

sconcat :: NonEmpty Void -> Void #

stimes :: Integral b => b -> Void -> Void #

Semigroup Event

Since: 4.10.0.0

Methods

(<>) :: Event -> Event -> Event #

sconcat :: NonEmpty Event -> Event #

stimes :: Integral b => b -> Event -> Event #

Semigroup Lifetime

Since: 4.10.0.0

Semigroup All

Since: 4.9.0.0

Methods

(<>) :: All -> All -> All #

sconcat :: NonEmpty All -> All #

stimes :: Integral b => b -> All -> All #

Semigroup Any

Since: 4.9.0.0

Methods

(<>) :: Any -> Any -> Any #

sconcat :: NonEmpty Any -> Any #

stimes :: Integral b => b -> Any -> Any #

Semigroup [a]

Since: 4.9.0.0

Methods

(<>) :: [a] -> [a] -> [a] #

sconcat :: NonEmpty [a] -> [a] #

stimes :: Integral b => b -> [a] -> [a] #

Semigroup a => Semigroup (Maybe a)

Since: 4.9.0.0

Methods

(<>) :: Maybe a -> Maybe a -> Maybe a #

sconcat :: NonEmpty (Maybe a) -> Maybe a #

stimes :: Integral b => b -> Maybe a -> Maybe a #

Semigroup a => Semigroup (IO a)

Since: 4.10.0.0

Methods

(<>) :: IO a -> IO a -> IO a #

sconcat :: NonEmpty (IO a) -> IO a #

stimes :: Integral b => b -> IO a -> IO a #

Ord a => Semigroup (Min a)

Since: 4.9.0.0

Methods

(<>) :: Min a -> Min a -> Min a #

sconcat :: NonEmpty (Min a) -> Min a #

stimes :: Integral b => b -> Min a -> Min a #

Ord a => Semigroup (Max a)

Since: 4.9.0.0

Methods

(<>) :: Max a -> Max a -> Max a #

sconcat :: NonEmpty (Max a) -> Max a #

stimes :: Integral b => b -> Max a -> Max a #

Semigroup (First a)

Since: 4.9.0.0

Methods

(<>) :: First a -> First a -> First a #

sconcat :: NonEmpty (First a) -> First a #

stimes :: Integral b => b -> First a -> First a #

Semigroup (Last a)

Since: 4.9.0.0

Methods

(<>) :: Last a -> Last a -> Last a #

sconcat :: NonEmpty (Last a) -> Last a #

stimes :: Integral b => b -> Last a -> Last a #

Monoid m => Semigroup (WrappedMonoid m)

Since: 4.9.0.0

Semigroup a => Semigroup (Option a)

Since: 4.9.0.0

Methods

(<>) :: Option a -> Option a -> Option a #

sconcat :: NonEmpty (Option a) -> Option a #

stimes :: Integral b => b -> Option a -> Option a #

Semigroup (NonEmpty a)

Since: 4.9.0.0

Methods

(<>) :: NonEmpty a -> NonEmpty a -> NonEmpty a #

sconcat :: NonEmpty (NonEmpty a) -> NonEmpty a #

stimes :: Integral b => b -> NonEmpty a -> NonEmpty a #

Semigroup a => Semigroup (Identity a)

Since: 4.9.0.0

Methods

(<>) :: Identity a -> Identity a -> Identity a #

sconcat :: NonEmpty (Identity a) -> Identity a #

stimes :: Integral b => b -> Identity a -> Identity a #

Semigroup a => Semigroup (Dual a)

Since: 4.9.0.0

Methods

(<>) :: Dual a -> Dual a -> Dual a #

sconcat :: NonEmpty (Dual a) -> Dual a #

stimes :: Integral b => b -> Dual a -> Dual a #

Semigroup (Endo a)

Since: 4.9.0.0

Methods

(<>) :: Endo a -> Endo a -> Endo a #

sconcat :: NonEmpty (Endo a) -> Endo a #

stimes :: Integral b => b -> Endo a -> Endo a #

Num a => Semigroup (Sum a)

Since: 4.9.0.0

Methods

(<>) :: Sum a -> Sum a -> Sum a #

sconcat :: NonEmpty (Sum a) -> Sum a #

stimes :: Integral b => b -> Sum a -> Sum a #

Num a => Semigroup (Product a)

Since: 4.9.0.0

Methods

(<>) :: Product a -> Product a -> Product a #

sconcat :: NonEmpty (Product a) -> Product a #

stimes :: Integral b => b -> Product a -> Product a #

Semigroup (First a)

Since: 4.9.0.0

Methods

(<>) :: First a -> First a -> First a #

sconcat :: NonEmpty (First a) -> First a #

stimes :: Integral b => b -> First a -> First a #

Semigroup (Last a)

Since: 4.9.0.0

Methods

(<>) :: Last a -> Last a -> Last a #

sconcat :: NonEmpty (Last a) -> Last a #

stimes :: Integral b => b -> Last a -> Last a #

Ord a => Semigroup (Set a) 

Methods

(<>) :: Set a -> Set a -> Set a #

sconcat :: NonEmpty (Set a) -> Set a #

stimes :: Integral b => b -> Set a -> Set a #

Semigroup b => Semigroup (a -> b)

Since: 4.9.0.0

Methods

(<>) :: (a -> b) -> (a -> b) -> a -> b #

sconcat :: NonEmpty (a -> b) -> a -> b #

stimes :: Integral b => b -> (a -> b) -> a -> b #

Semigroup (Either a b)

Since: 4.9.0.0

Methods

(<>) :: Either a b -> Either a b -> Either a b #

sconcat :: NonEmpty (Either a b) -> Either a b #

stimes :: Integral b => b -> Either a b -> Either a b #

(Semigroup a, Semigroup b) => Semigroup (a, b)

Since: 4.9.0.0

Methods

(<>) :: (a, b) -> (a, b) -> (a, b) #

sconcat :: NonEmpty (a, b) -> (a, b) #

stimes :: Integral b => b -> (a, b) -> (a, b) #

Semigroup (Proxy k s)

Since: 4.9.0.0

Methods

(<>) :: Proxy k s -> Proxy k s -> Proxy k s #

sconcat :: NonEmpty (Proxy k s) -> Proxy k s #

stimes :: Integral b => b -> Proxy k s -> Proxy k s #

Semigroup (ZipAsyncM m a) # 

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Semigroup (ZipSerialM m a) # 

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

MonadAsync m => Semigroup (ParallelT m a) # 

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Semigroup (WAsyncT m a) # 

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) # 

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

Semigroup (WSerialT m a) # 

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Semigroup (SerialT m a) # 

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

(Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c)

Since: 4.9.0.0

Methods

(<>) :: (a, b, c) -> (a, b, c) -> (a, b, c) #

sconcat :: NonEmpty (a, b, c) -> (a, b, c) #

stimes :: Integral b => b -> (a, b, c) -> (a, b, c) #

Semigroup a => Semigroup (Const k a b)

Since: 4.9.0.0

Methods

(<>) :: Const k a b -> Const k a b -> Const k a b #

sconcat :: NonEmpty (Const k a b) -> Const k a b #

stimes :: Integral b => b -> Const k a b -> Const k a b #

Alternative f => Semigroup (Alt * f a)

Since: 4.9.0.0

Methods

(<>) :: Alt * f a -> Alt * f a -> Alt * f a #

sconcat :: NonEmpty (Alt * f a) -> Alt * f a #

stimes :: Integral b => b -> Alt * f a -> Alt * f a #

(Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d)

Since: 4.9.0.0

Methods

(<>) :: (a, b, c, d) -> (a, b, c, d) -> (a, b, c, d) #

sconcat :: NonEmpty (a, b, c, d) -> (a, b, c, d) #

stimes :: Integral b => b -> (a, b, c, d) -> (a, b, c, d) #

(Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e)

Since: 4.9.0.0

Methods

(<>) :: (a, b, c, d, e) -> (a, b, c, d, e) -> (a, b, c, d, e) #

sconcat :: NonEmpty (a, b, c, d, e) -> (a, b, c, d, e) #

stimes :: Integral b => b -> (a, b, c, d, e) -> (a, b, c, d, e) #

Deprecated

type Streaming = IsStream Source #

Deprecated: Please use IsStream instead.

Same as IsStream.

Since: 0.1.0

runStreaming :: (Monad m, IsStream t) => t m a -> m () Source #

Deprecated: Please use runStream instead.

Same as runStream

Since: 0.1.0

runStreamT :: Monad m => SerialT m a -> m () Source #

Deprecated: Please use runStream instead.

Same as runStream.

Since: 0.1.0

runInterleavedT :: Monad m => InterleavedT m a -> m () Source #

Deprecated: Please use 'runStream . interleaving' instead.

Same as runStream . wSerially.

Since: 0.1.0

runAsyncT :: Monad m => AsyncT m a -> m () Source #

Deprecated: Please use 'runStream . asyncly' instead.

Same as runStream . asyncly.

Since: 0.1.0

runParallelT :: Monad m => ParallelT m a -> m () Source #

Deprecated: Please use 'runStream . parallely' instead.

Same as runStream . parallely.

Since: 0.1.0

runZipStream :: Monad m => ZipSerialM m a -> m () Source #

Deprecated: Please use 'runStream . zipSerially instead.

Same as runStream . zipping.

Since: 0.1.0

runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #

Deprecated: Please use 'runStream . zipAsyncly instead.

Same as runStream . zippingAsync.

Since: 0.1.0

type StreamT = SerialT Source #

Deprecated: Please use SerialT instead.

Since: 0.1.0

type InterleavedT = WSerialT Source #

Deprecated: Please use WSerialT instead.

Since: 0.1.0

type ZipStream = ZipSerialM Source #

Deprecated: Please use ZipSerialM instead.

Since: 0.1.0

interleaving :: IsStream t => WSerialT m a -> t m a Source #

Deprecated: Please use wSerially instead.

Same as wSerially.

Since: 0.1.0

zipping :: IsStream t => ZipSerialM m a -> t m a Source #

Deprecated: Please use zipSerially instead.

Same as zipSerially.

Since: 0.1.0

zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

Deprecated: Please use zipAsyncly instead.

Same as zipAsyncly.

Since: 0.1.0

(<=>) :: IsStream t => t m a -> t m a -> t m a infixr 5 Source #

Deprecated: Please use wSerial instead.

Same as wSerial.

Since: 0.1.0

(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Deprecated: Please use async instead.

Same as async.

Since: 0.1.0