streamly-0.8.0: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.Async

Description

To run examples in this module:

>>> import qualified Streamly.Prelude as Stream
>>> import Control.Concurrent (threadDelay)
>>> :{
 delay n = do
     threadDelay (n * 1000000)   -- sleep for n seconds
     putStrLn (show n ++ " sec") -- print "n sec"
     return n                    -- IO Int
:}
Synopsis

Documentation

data AsyncT m a Source #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromAsync :: IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

>>> import Streamly.Prelude (async)
>>> stream1 = Stream.fromEffect (delay 4)
>>> stream2 = Stream.fromEffect (delay 2)
>>> Stream.toList $ stream1 `async` stream2
2 sec
4 sec
[2,4]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `async` stream2 `async` stream3
...
[1,2,4]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
...
[2,1,4]

With a single thread, it becomes serial:

>>> Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
...
[4,2,1]

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

>>> stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>> stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>> Stream.toList $ stream1 `async` stream2 -- IO [Int]
...
[1,1,3,3]

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

>>> stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int
>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]
...
[1,1,3,2,3,2]

Thus async goes deep in first few streams rather than going wide in all streams. It prefers to evaluate the leftmost streams as much as possible. Because of this behavior, async can be safely used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Deprecated: Please use async instead.

Same as async.

Since: 0.1.0

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Since: 0.2.0 (Streamly)

Since: 0.8.0

mkAsyncK :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.

Pre-release

data WAsyncT m a Source #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, wAsync is the same as async. See async for singleton stream behavior. For multi-element streams, while async is left biased i.e. it tries to evaluate the left side stream as much as possible, wAsync tries to schedule them both fairly. In other words, async goes deep while wAsync goes wide. However, outputs are always used as they arrive.

With a single thread, async starts behaving like serial while wAsync starts behaving like wSerial.

>>> import Streamly.Prelude (wAsync)
>>> stream1 = Stream.fromList [1,2,3]
>>> stream2 = Stream.fromList [4,5,6]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]

With two threads available, and combining three streams:

>>> stream3 = Stream.fromList [7,8,9]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that WSerialT and single threaded WAsyncT both interleave streams but the exact scheduling is slightly different in both cases.

Since: 0.2.0 (Streamly)

Since: 0.8.0