streamly-0.9.0: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilitypre-release
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream

Description

Deprecated: Please use "Streamly.Internal.Data.Stream from streamly-core package", Streamly.Internal.Data.Stream.Concurrent, Streamly.Internal.Data.Stream.Exception.Lifted, & Streamly.Internal.Data.Stream.Time from streamly package instead.

This is an internal module which is a superset of the corresponding released module Streamly.Prelude. It contains some additional unreleased or experimental APIs.

Synopsis

Documentation

newtype StreamK (m :: Type -> TYPE LiftedRep) a #

Constructors

MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
(Foldable m, Monad m) => Foldable (StreamK m) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

fold :: Monoid m0 => StreamK m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldr :: (a -> b -> b) -> b -> StreamK m a -> b #

foldr' :: (a -> b -> b) -> b -> StreamK m a -> b #

foldl :: (b -> a -> b) -> b -> StreamK m a -> b #

foldl' :: (b -> a -> b) -> b -> StreamK m a -> b #

foldr1 :: (a -> a -> a) -> StreamK m a -> a #

foldl1 :: (a -> a -> a) -> StreamK m a -> a #

toList :: StreamK m a -> [a] #

null :: StreamK m a -> Bool #

length :: StreamK m a -> Int #

elem :: Eq a => a -> StreamK m a -> Bool #

maximum :: Ord a => StreamK m a -> a #

minimum :: Ord a => StreamK m a -> a #

sum :: Num a => StreamK m a -> a #

product :: Num a => StreamK m a -> a #

Traversable (StreamK Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) #

sequenceA :: Applicative f => StreamK Identity (f a) -> f (StreamK Identity a) #

mapM :: Monad m => (a -> m b) -> StreamK Identity a -> m (StreamK Identity b) #

sequence :: Monad m => StreamK Identity (m a) -> m (StreamK Identity a) #

Monad m => Functor (StreamK m) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

fmap :: (a -> b) -> StreamK m a -> StreamK m b #

(<$) :: a -> StreamK m b -> StreamK m a #

a ~ Char => IsString (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Monoid (StreamK m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

mempty :: StreamK m a #

mappend :: StreamK m a -> StreamK m a -> StreamK m a #

mconcat :: [StreamK m a] -> StreamK m a #

Semigroup (StreamK m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(<>) :: StreamK m a -> StreamK m a -> StreamK m a #

sconcat :: NonEmpty (StreamK m a) -> StreamK m a #

stimes :: Integral b => b -> StreamK m a -> StreamK m a #

IsList (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Associated Types

type Item (StreamK Identity a) #

Read a => Read (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Show a => Show (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

type Item (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

type Item (StreamK Identity a) = a

type WSerial = WSerialT IO Source #

An interleaving serial IO stream of elements of type a. See WSerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data WSerialT m a Source #

For WSerialT streams:

(<>) = wSerial                       -- Semigroup
(>>=) = flip . concatMapWith wSerial -- Monad

Note that <> is associative only if we disregard the ordering of elements in the resulting stream.

A single Monad bind behaves like a for loop:

>>> :{
IsStream.toList $ IsStream.fromWSerial $ do
     x <- IsStream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like interleaved nested for loops:

>>> :{
IsStream.toList $ IsStream.fromWSerial $ do
    x <- IsStream.fromList [1,2] -- foreach x in stream
    y <- IsStream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(2,3),(1,4),(2,4)]

It is a result of interleaving all the nested iterations corresponding to element 1 in the first stream with all the nested iterations of element 2:

>>> import Streamly.Prelude (wSerial)
>>> IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of SerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WSerialT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

MonadTrans WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

MonadState s m => MonadState s (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadIO m => MonadIO (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

Monad m => Applicative (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

Monad m => Functor (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Monad (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

NFData1 (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

a ~ Char => IsString (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Monoid (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

IsList (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (WSerialT Identity a) #

Read a => Read (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

NFData a => NFData (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: WSerialT Identity a -> () #

Eq a => Eq (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a

type Serial = SerialT IO Source #

A serial IO stream of elements of type a. See SerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data SerialT m a Source #

For SerialT streams:

(<>) = serial                       -- Semigroup
(>>=) = flip . concatMapWith serial -- Monad

A single Monad bind behaves like a for loop:

>>> :{
IsStream.toList $ do
     x <- IsStream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
IsStream.toList $ do
    x <- IsStream.fromList [1,2] -- foreach x in stream
    y <- IsStream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. SerialT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

MonadTrans SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

MonadState s m => MonadState s (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadIO m => MonadIO (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

Monad m => Applicative (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

Monad m => Functor (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Monad (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

NFData1 (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

a ~ Char => IsString (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Monoid (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

IsList (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (SerialT Identity a) #

Read a => Read (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

NFData a => NFData (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: SerialT Identity a -> () #

Eq a => Eq (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a

type Parallel = ParallelT IO Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data ParallelT m a Source #

For ParallelT streams:

(<>) = parallel
(>>=) = flip . concatMapWith parallel

See AsyncT, ParallelT is similar except that all iterations are strictly concurrent while in AsyncT it depends on the consumer demand and available threads. See parallel for more details.

Since: 0.1.0 (Streamly)

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: 0.8.0

Instances

Instances details
IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ParallelT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

MonadTrans ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data WAsyncT m a Source #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WAsyncT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data AsyncT m a Source #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AsyncT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0 (Streamly)

Since: 0.8.0

data AheadT m a Source #

For AheadT streams:

(<>) = ahead
(>>=) = flip . concatMapWith ahead

A single Monad bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order:

>>> :{
Stream.toList $ Stream.fromAhead $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[2,1]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time:

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,5,4,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.

Since: 0.3.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AheadT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

MonadTrans AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

type ZipSerial = ZipSerialM IO Source #

An IO stream whose applicative instance zips streams serially.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data ZipSerialM m a Source #

For ZipSerialM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id

Applicative evaluates the streams being zipped serially:

>>> s1 = Stream.fromFoldable [1, 2]
>>> s2 = Stream.fromFoldable [3, 4]
>>> s3 = Stream.fromFoldable [5, 6]
>>> Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipSerialM m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(Foldable m, Monad m) => Foldable (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fold :: Monoid m0 => ZipSerialM m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldr :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldr' :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldl :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldl' :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldr1 :: (a -> a -> a) -> ZipSerialM m a -> a #

foldl1 :: (a -> a -> a) -> ZipSerialM m a -> a #

toList :: ZipSerialM m a -> [a] #

null :: ZipSerialM m a -> Bool #

length :: ZipSerialM m a -> Int #

elem :: Eq a => a -> ZipSerialM m a -> Bool #

maximum :: Ord a => ZipSerialM m a -> a #

minimum :: Ord a => ZipSerialM m a -> a #

sum :: Num a => ZipSerialM m a -> a #

product :: Num a => ZipSerialM m a -> a #

Traversable (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

traverse :: Applicative f => (a -> f b) -> ZipSerialM Identity a -> f (ZipSerialM Identity b) #

sequenceA :: Applicative f => ZipSerialM Identity (f a) -> f (ZipSerialM Identity a) #

mapM :: Monad m => (a -> m b) -> ZipSerialM Identity a -> m (ZipSerialM Identity b) #

sequence :: Monad m => ZipSerialM Identity (m a) -> m (ZipSerialM Identity a) #

Monad m => Applicative (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

pure :: a -> ZipSerialM m a #

(<*>) :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

liftA2 :: (a -> b -> c) -> ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m c #

(*>) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m b #

(<*) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m a #

Monad m => Functor (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

(<$) :: a -> ZipSerialM m b -> ZipSerialM m a #

NFData1 (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

liftRnf :: (a -> ()) -> ZipSerialM Identity a -> () #

a ~ Char => IsString (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Monoid (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

mempty :: ZipSerialM m a #

mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

mconcat :: [ZipSerialM m a] -> ZipSerialM m a #

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

IsList (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Associated Types

type Item (ZipSerialM Identity a) #

Read a => Read (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Show a => Show (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

NFData a => NFData (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

rnf :: ZipSerialM Identity a -> () #

Eq a => Eq (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Ord a => Ord (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) = a

type ZipAsync = ZipAsyncM IO Source #

An IO stream whose applicative instance zips streams wAsyncly.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data ZipAsyncM m a Source #

For ZipAsyncM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id

Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:

>>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
...
[(1,1),(1,1),(1,1)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipAsyncM m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

MonadAsync m => Applicative (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

pure :: a -> ZipAsyncM m a #

(<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c #

(*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b #

(<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a #

Monad m => Functor (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

(<$) :: a -> ZipAsyncM m b -> ZipAsyncM m a #

Monoid (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

mempty :: ZipAsyncM m a #

mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Methods

consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ fromSerial  $ delay |: delay |: delay |: nil
drain $ fromParallel $ delay |: delay |: delay |: nil

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

Instances

Instances details
IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AheadT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AsyncT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WAsyncT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ParallelT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. SerialT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WSerialT m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipSerialM m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipAsyncM m a -> StreamK m a Source #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

toStreamK :: IsStream t => t m a -> StreamK m a Source #

fromStreamK :: IsStream t => StreamK m a -> t m a Source #

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromStreamD :: (IsStream t, Monad m) => Stream m a -> t m a Source #

toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a Source #

Adapt a polymorphic consM operation to a StreamK cons operation

toStreamD :: (IsStream t, Monad m) => t m a -> Stream m a Source #

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList = foldr cons nil

Construct a stream from a list of pure values. This is more efficient than fromFoldable for serial streams.

Since: 0.4.0

mkStream :: IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Build a stream from an SVar, a stop continuation, a singleton stream continuation and a yield continuation.

foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b Source #

foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #

Like foldlx', but with a monadic step function.

Since: 0.7.0

foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.7.0

foldStreamShared :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

foldStream :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

fromSerial :: IsStream t => SerialT m a -> t m a Source #

Fix the type of a polymorphic stream as SerialT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromWSerial :: IsStream t => WSerialT m a -> t m a Source #

Fix the type of a polymorphic stream as WSerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromAsync :: IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromAhead :: IsStream t => AheadT m a -> t m a Source #

Fix the type of a polymorphic stream as AheadT.

Since: 0.3.0 (Streamly)

Since: 0.8.0

fromParallel :: IsStream t => ParallelT m a -> t m a Source #

Fix the type of a polymorphic stream as ParallelT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipSerialM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipAsyncM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

nil :: IsStream t => t m a Source #

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b Source #

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

concatMapWith mixer generator stream is a two dimensional looping combinator. The generator function is used to generate streams from the elements in the input stream and the mixer function is used to merge those streams.

Note we can merge streams concurrently by using a concurrent merge function.

Since: 0.7.0

Since: 0.8.0 (signature change)

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.

concatMapFoldableWith async return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)

Since: 0.1.0 (Streamly)

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like concatMapFoldableWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs
concatForFoldableWith f = flip (D.concatMapFoldableWith f)

Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)

Since: 0.1.0 (Streamly)

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

concatFoldableWith async $ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f D.nil
concatFoldableWith f = D.concatMapFoldableWith f id

Since: 0.8.0 (Renamed foldWith to concatFoldableWith)

Since: 0.1.0 (Streamly)

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

>>> Stream.fold Fold.sum Stream.nil
0

However, foldMany on an empty stream results in an empty stream. Therefore, Stream.fold f is not the same as Stream.head . Stream.foldMany f.

fold f = Stream.parse (Parser.fromFold f)

Since: 0.7.0

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of uncons, however, this is generally less efficient than specific folds because it takes apart the stream one element at a time, therefore, does not take adavantage of stream fusion.

Since: 0.1.0

foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #

Right associative/lazy pull fold. foldrM build final stream constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build again thus lazily consuming the input stream until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.

Example, determine if any element is odd in a stream:

>>> Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)
True

Since: 0.7.0 (signature changed)

Since: 0.2.0 (signature changed)

Since: 0.1.0

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

Since: 0.1.0

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Lazy right fold for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #

Lazy left fold to a stream.

foldlT :: (Monad m, IsStream t, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #

Lazy left fold to a transformer monad.

For example, to reverse a stream:

D.toList $ D.foldlT (flip D.cons) D.nil $ (D.fromList [1..5] :: SerialT IO Int)

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.2.0

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Left associative/strict push fold. foldl' reduce initial stream invokes reduce with the accumulator and the next input in the input stream, using initial as the initial value of the current value of the accumulator. When the input is exhausted the current value of the accumulator is returned. Make sure to use a strict data structure for accumulator to not build unnecessary lazy expressions unless that's what you want. See the previous section for more details.

Since: 0.2.0

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Like foldx, but with a monadic step function.

Since: 0.2.0

foldlM' :: Monad m => (b -> a -> m b) -> m b -> SerialT m a -> m b Source #

Like foldl' but with a monadic step function.

Since: 0.2.0

Since: 0.8.0 (signature change)

parseD :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #

Parse a stream using the supplied ParserD Parser.

Internal

parse :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #

Parse a stream using the supplied Parser.

Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

>>> Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")

Note:

fold f = Stream.parse (Parser.fromFold f)

parse p is not the same as head . parseMany p on an empty stream.

Pre-release

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

mapM_ = Stream.drain . Stream.mapM

Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.

Since: 0.1.0

drain :: Monad m => SerialT m a -> m () Source #

drain = mapM_ (\_ -> return ())
drain = Stream.fold Fold.drain

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example Stream.drain . fromAsync.

Since: 0.7.0

drainN :: Monad m => Int -> SerialT m a -> m () Source #

drainN n = Stream.drain . Stream.take n
drainN n = Stream.fold (Fold.take n Fold.drain)

Run maximum up to n iterations of a stream.

Since: 0.7.0

runN :: Monad m => Int -> SerialT m a -> m () Source #

runN n = runStream . take n

Run maximum up to n iterations of a stream.

Since: 0.6.0

drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

drainWhile p = Stream.drain . Stream.takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.7.0

runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

runWhile p = runStream . takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.6.0

runStream :: Monad m => SerialT m a -> m () Source #

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example runStream . fromAsync.

Since: 0.2.0

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

null = Stream.fold Fold.null

Since: 0.1.1

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

head = (!! 0)
head = Stream.fold Fold.one

Since: 0.1.0

headElse :: Monad m => a -> SerialT m a -> m a Source #

Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.

Pre-release

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

tail = fmap (fmap snd) . Stream.uncons

Extract all but the first element of the stream, if any.

Since: 0.1.1

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

Since: 0.5.0

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

last xs = xs !! (Stream.length xs - 1)
last = Stream.fold Fold.last

Since: 0.1.1

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

elem = Stream.fold Fold.elem

Since: 0.1.0

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

notElem = Stream.fold Fold.length

Since: 0.1.0

length :: Monad m => SerialT m a -> m Int Source #

Determine the length of the stream.

Since: 0.1.0

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

all = Stream.fold Fold.all

Since: 0.1.0

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

any = Stream.fold Fold.any

Since: 0.1.0

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

and = Stream.fold Fold.and

Since: 0.5.0

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines whether at least one element of a boolean stream is True.

or = Stream.fold Fold.or

Since: 0.5.0

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers. Returns 0 when the stream is empty. Note that this is not numerically stable for floating point numbers.

sum = Stream.fold Fold.sum

Since: 0.1.0

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers. Returns 1 when the stream is empty.

product = Stream.fold Fold.product

Since: 0.1.1

mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #

Fold a stream of monoid elements by appending them.

mconcat = Stream.fold Fold.mconcat

Pre-release

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

minimum = minimumBy compare
minimum = Stream.fold Fold.minimum

Determine the minimum element in a stream.

Since: 0.1.0

minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

minimumBy = Stream.fold Fold.minimumBy

Since: 0.6.0

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

maximum = maximumBy compare
maximum = Stream.fold Fold.maximum

Determine the maximum element in a stream.

Since: 0.1.0

maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

maximumBy = Stream.fold Fold.maximumBy

Since: 0.6.0

the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #

Ensures that all the elements of the stream are identical and then returns that unique element.

Since: 0.6.0

(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #

Lookup the element at the given index.

Since: 0.6.0

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

In a stream of (key-value) pairs (a, b), return the value b of the first pair where the key equals the given value a.

lookup = snd <$> Stream.find ((==) . fst)
lookup = Stream.fold Fold.lookup

Since: 0.5.0

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Like findM but with a non-monadic predicate.

find p = findM (return . p)
find = Stream.fold Fold.find

Since: 0.5.0

findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element that satisfies the given predicate.

findM = Stream.fold Fold.findM

Since: 0.6.0

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

findIndex = Stream.fold Fold.findIndex

Since: 0.5.0

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

elemIndex a = Stream.findIndex (== a)

Since: 0.5.0

toList :: Monad m => SerialT m a -> m [a] Source #

toList = Stream.foldr (:) []

Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Since: 0.1.0

toListRev :: Monad m => SerialT m a -> m [a] Source #

toListRev = Stream.foldl' (flip (:)) []

Convert a stream into a list in reverse order in the underlying monad.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Pre-release

toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #

toHandle h = D.mapM_ $ hPutStrLn h

Write a stream of Strings to an IO Handle.

Since: 0.1.0

toStreamRev :: Monad m => SerialT m a -> m (SerialT n a) Source #

Convert a stream to a pure stream in reverse order.

toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil

Pre-release

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function t m a -> m b to a stream t m a concurrently; The the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the folding action runs in another thread consuming the input from the buffer.

If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look at it as a transformation that converts a fold function to a buffered concurrent fold function.

The . at the end of the operator is a mnemonic for termination of the stream.

In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Prelude ((|$.))
>>> :{
 Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ())
     |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #

Same as |$..

Internal

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

Same as |$. but with arguments reversed.

(|&.) = flip (|$.)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.

>>> Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a, Unbox a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is an infix of the second. A stream is considered an infix of itself.

Stream.isInfixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)

True

Space: O(n) worst case where n is the length of the infix.

Pre-release

Requires Storable constraint

isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is a suffix of the second. A stream is considered a suffix of itself.

>>> Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True

Space: O(n), buffers entire input stream and the suffix.

Pre-release

Suboptimal - Help wanted.

isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.

>>> Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #

stripPrefix prefix stream strips prefix from stream if it is a prefix of stream. Returns Nothing if the stream does not start with the given prefix, stripped stream otherwise. Returns Just nil when the prefix is the same as the stream.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".

Space: O(1)

Since: 0.6.0

stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #

Drops the given suffix from a stream. Returns Nothing if the stream does not end with the given suffix. Returns Just nil when the suffix is the same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".

Space: O(n), buffers the entire input stream as well as the suffix

Pre-release

eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #

Compare two streams for equality using an equality function.

Since: 0.6.0

cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #

Compare two streams lexicographically using a comparison function.

Since: 0.6.0

fromStream :: (IsStream t, Monad m) => Stream m a -> t m a Source #

toStream :: (IsStream t, Monad m) => t m a -> Stream m a Source #