streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.Async

Description

To run examples in this module:

>>> import qualified Streamly.Prelude as Stream
>>> import Control.Concurrent (threadDelay)
>>> :{
 delay n = do
     threadDelay (n * 1000000)   -- sleep for n seconds
     putStrLn (show n ++ " sec") -- print "n sec"
     return n                    -- IO Int
:}
Synopsis

Documentation

newtype AsyncT m a Source #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Constructors

AsyncT 

Fields

Instances

Instances details
IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

XXX we can implement it more efficienty by directly implementing instead of combining streams using async.

asyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a Source #

mkAsyncK :: MonadAsync m => Stream m a -> Stream m a Source #

Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.

Pre-release

newtype WAsyncT m a Source #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Constructors

WAsyncT 

Fields

Instances

Instances details
IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

XXX we can implement it more efficienty by directly implementing instead of combining streams using wAsync.

wAsyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a Source #