streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.StreamK

Description

Continuation passing style (CPS) stream implementation. The symbol K below denotes a function as well as a Kontinuation.

import qualified Streamly.Internal.Data.Stream.StreamK as K
Synopsis

The stream type

newtype Stream m a Source #

The type Stream m a represents a monadic stream of values of type a constructed using actions in monad m. It uses stop, singleton and yield continuations equivalent to the following direct style type:

data Stream m a = Stop | Singleton a | Yield a (Stream m a)

To facilitate parallel composition we maintain a local state in an SVar that is shared across and is used for synchronization of the streams being composed.

The singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of pure for Applicative and Monad, and in lift for MonadTrans.

Constructors

MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
MonadTrans Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

lift :: Monad m => m a -> Stream m a #

Monad m => Applicative (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

pure :: a -> Stream m a #

(<*>) :: Stream m (a -> b) -> Stream m a -> Stream m b #

liftA2 :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c #

(*>) :: Stream m a -> Stream m b -> Stream m b #

(<*) :: Stream m a -> Stream m b -> Stream m a #

Monad m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b #

(<$) :: a -> Stream m b -> Stream m a #

Monad m => Monad (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(>>=) :: Stream m a -> (a -> Stream m b) -> Stream m b #

(>>) :: Stream m a -> Stream m b -> Stream m b #

return :: a -> Stream m a #

Monoid (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

mempty :: Stream m a #

mappend :: Stream m a -> Stream m a -> Stream m a #

mconcat :: [Stream m a] -> Stream m a #

Semigroup (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(<>) :: Stream m a -> Stream m a -> Stream m a #

sconcat :: NonEmpty (Stream m a) -> Stream m a #

stimes :: Integral b => b -> Stream m a -> Stream m a #

Construction Primitives

mkStream :: (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) -> Stream m a Source #

nil :: Stream m a Source #

An empty stream.

> toList nil
[]

Since: 0.1.0

nilM :: Applicative m => m b -> Stream m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil"))
"nil"
[]

Pre-release

cons :: a -> Stream m a -> Stream m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: a -> Stream m a -> Stream m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

Elimination Primitives

foldStream :: State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> Stream m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldStreamShared :: State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> Stream m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

Transformation Primitives

unShare :: Stream m a -> Stream m a Source #

Detach a stream from an SVar

Deconstruction

uncons :: Applicative m => Stream m a -> m (Maybe (a, Stream m a)) Source #

Generation

Unfolds

unfoldr :: (b -> Maybe (a, b)) -> b -> Stream m a Source #

unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> Stream m a Source #

Specialized Generation

repeat :: a -> Stream m a Source #

Generate an infinite stream by repeating a pure value.

Pre-release

repeatM :: Monad m => m a -> Stream m a Source #

replicate :: Int -> a -> Stream m a Source #

replicateM :: Monad m => Int -> m a -> Stream m a Source #

fromIndices :: (Int -> a) -> Stream m a Source #

fromIndicesM :: Monad m => (Int -> m a) -> Stream m a Source #

iterate :: (a -> a) -> a -> Stream m a Source #

iterateM :: Monad m => (a -> m a) -> m a -> Stream m a Source #

Conversions

fromPure :: a -> Stream m a Source #

fromEffect :: Monad m => m a -> Stream m a Source #

fromFoldable :: Foldable f => f a -> Stream m a Source #

fromFoldable = foldr cons nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromList :: [a] -> Stream m a Source #

foldr/build

foldrS :: (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

Lazy right associative fold to a stream.

foldrSM :: Monad m => (m a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

buildS :: ((a -> Stream m a -> Stream m a) -> Stream m a -> Stream m a) -> Stream m a Source #

augmentS :: ((a -> Stream m a -> Stream m a) -> Stream m a -> Stream m a) -> Stream m a -> Stream m a Source #

Elimination

General Folds

foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #

Lazy right associative fold.

foldr1 :: Monad m => (a -> a -> a) -> Stream m a -> m (Maybe a) Source #

foldrM :: (a -> m b -> m b) -> m b -> Stream m a -> m b Source #

Lazy right fold with a monadic step function.

foldrT :: (Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> Stream m a -> s m b Source #

Right associative fold to an arbitrary transformer monad.

foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b Source #

Strict left associative fold.

foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b Source #

Like foldl' but with a monadic step function.

foldlS :: (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

Lazy left fold to a stream.

foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b Source #

Lazy left fold to an arbitrary transformer monad.

foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Note that the accumulator is always evaluated including the initial value.

foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b Source #

Like foldx, but with a monadic step function.

fold :: Monad m => Fold m a b -> Stream m a -> m b Source #

Specialized Folds

drain :: Monad m => Stream m a -> m () Source #

drain = foldl' (\_ _ -> ()) ()
drain = mapM_ (\_ -> return ())

null :: Monad m => Stream m a -> m Bool Source #

head :: Monad m => Stream m a -> m (Maybe a) Source #

tail :: Applicative m => Stream m a -> m (Maybe (Stream m a)) Source #

init :: Applicative m => Stream m a -> m (Maybe (Stream m a)) Source #

elem :: (Monad m, Eq a) => a -> Stream m a -> m Bool Source #

notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool Source #

all :: Monad m => (a -> Bool) -> Stream m a -> m Bool Source #

any :: Monad m => (a -> Bool) -> Stream m a -> m Bool Source #

last :: Monad m => Stream m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a) Source #

minimumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a) Source #

maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a) Source #

maximumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a) Source #

findIndices :: (a -> Bool) -> Stream m a -> Stream m Int Source #

lookup :: (Monad m, Eq a) => a -> Stream m (a, b) -> m (Maybe b) Source #

findM :: Monad m => (a -> m Bool) -> Stream m a -> m (Maybe a) Source #

find :: Monad m => (a -> Bool) -> Stream m a -> m (Maybe a) Source #

(!!) :: Monad m => Stream m a -> Int -> m (Maybe a) Source #

Map and Fold

mapM_ :: Monad m => (a -> m b) -> Stream m a -> m () Source #

Apply a monadic action to each element of the stream and discard the output of the action.

Conversions

toList :: Monad m => Stream m a -> m [a] Source #

hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> Stream m a -> Stream n a Source #

Transformation

By folding (scans)

scanl' :: (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

scanlx' :: (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b Source #

Filtering

filter :: (a -> Bool) -> Stream m a -> Stream m a Source #

take :: Int -> Stream m a -> Stream m a Source #

takeWhile :: (a -> Bool) -> Stream m a -> Stream m a Source #

drop :: Int -> Stream m a -> Stream m a Source #

dropWhile :: (a -> Bool) -> Stream m a -> Stream m a Source #

Mapping

map :: (a -> b) -> Stream m a -> Stream m b Source #

mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #

sequence :: Monad m => Stream m (m a) -> Stream m a Source #

Inserting

intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #

intersperse :: Monad m => a -> Stream m a -> Stream m a Source #

insertBy :: (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #

Deleting

deleteBy :: (a -> a -> Bool) -> a -> Stream m a -> Stream m a Source #

Reordering

reverse :: Stream m a -> Stream m a Source #

Map and Filter

mapMaybe :: (a -> Maybe b) -> Stream m a -> Stream m b Source #

Zipping

zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

Zip two streams serially using a pure zipping function.

Since: 0.1.0

zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Zip two streams serially using a monadic zipping function.

Since: 0.1.0

Merging

mergeBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Nesting

concatMapWith :: (Stream m b -> Stream m b -> Stream m b) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function. For example, the concat function could be serial, parallel, async, ahead or any other zip or merge function.

Since: 0.7.0

concatMap :: (a -> Stream m b) -> Stream m a -> Stream m b Source #

bindWith :: (Stream m b -> Stream m b -> Stream m b) -> Stream m a -> (a -> Stream m b) -> Stream m b Source #

concatPairsWith :: (Stream m b -> Stream m b -> Stream m b) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #

See concatPairsWith for documentation.

apWith :: (Stream m b -> Stream m b -> Stream m b) -> Stream m (a -> b) -> Stream m a -> Stream m b Source #

apSerial :: Stream m (a -> b) -> Stream m a -> Stream m b Source #

Transformation comprehensions

the :: (Eq a, Monad m) => Stream m a -> m (Maybe a) Source #

Semigroup Style Composition

serial :: Stream m a -> Stream m a -> Stream m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

Utilities

consM :: Monad m => m a -> Stream m a -> Stream m a infixr 5 Source #

withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a Source #

mfix :: Monad m => (m a -> Stream m a) -> Stream m a Source #