streamly-0.8.0: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.StreamK

Description

Continuation passing style (CPS) stream implementation. The symbol K below denotes a function as well as a Kontinuation.

import qualified Streamly.Internal.Data.Stream.StreamK as K
Synopsis

A class for streams

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Methods

toStream :: t m a -> Stream m a Source #

fromStream :: Stream m a -> t m a Source #

consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ fromSerial  $ delay |: delay |: delay |: nil
drain $ fromParallel $ delay |: delay |: delay |: nil

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

Instances

Instances details
IsStream Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

toStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #

consM :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

(|:) :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0 (Streamly)

Since: 0.8.0

The stream type

newtype Stream m a Source #

The type Stream m a represents a monadic stream of values of type a constructed using actions in monad m. It uses stop, singleton and yield continuations equivalent to the following direct style type:

data Stream m a = Stop | Singleton a | Yield a (Stream m a)

To facilitate parallel composition we maintain a local state in an SVar that is shared across and is used for synchronization of the streams being composed.

The singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of pure for Applicative and Monad, and in lift for MonadTrans.

Constructors

MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
MonadTrans Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

lift :: Monad m => m a -> Stream m a #

IsStream Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

toStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #

consM :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

(|:) :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

Monad m => Monad (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(>>=) :: Stream m a -> (a -> Stream m b) -> Stream m b #

(>>) :: Stream m a -> Stream m b -> Stream m b #

return :: a -> Stream m a #

Monad m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b #

(<$) :: a -> Stream m b -> Stream m a #

Monad m => Applicative (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

pure :: a -> Stream m a #

(<*>) :: Stream m (a -> b) -> Stream m a -> Stream m b #

liftA2 :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c #

(*>) :: Stream m a -> Stream m b -> Stream m b #

(<*) :: Stream m a -> Stream m b -> Stream m a #

Semigroup (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(<>) :: Stream m a -> Stream m a -> Stream m a #

sconcat :: NonEmpty (Stream m a) -> Stream m a #

stimes :: Integral b => b -> Stream m a -> Stream m a #

Monoid (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

mempty :: Stream m a #

mappend :: Stream m a -> Stream m a -> Stream m a #

mconcat :: [Stream m a] -> Stream m a #

Construction Primitives

mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Build a stream from an SVar, a stop continuation, a singleton stream continuation and a yield continuation.

nil :: IsStream t => t m a Source #

An empty stream.

> toList nil
[]

Since: 0.1.0

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil"))
"nil"
[]

Pre-release

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

Elimination Primitives

foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

Transformation Primitives

unShare :: IsStream t => t m a -> t m a Source #

Detach a stream from an SVar

Deconstruction

uncons :: (IsStream t, Monad m) => t m a -> m (Maybe (a, t m a)) Source #

Generation

Unfolds

unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a Source #

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Specialized Generation

repeat :: IsStream t => a -> t m a Source #

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

repeatM = fix . cons
repeatM = cycle1 . fromPure

Generate an infinite stream by repeating a monadic value.

Pre-release

replicate :: IsStream t => Int -> a -> t m a Source #

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

fromIndices :: IsStream t => (Int -> a) -> t m a Source #

fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

iterate :: IsStream t => (a -> a) -> a -> t m a Source #

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

Conversions

fromPure :: IsStream t => a -> t m a Source #

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

fromFoldable = foldr cons nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromList :: IsStream t => [a] -> t m a Source #

fromStreamK :: IsStream t => Stream m a -> t m a Source #

foldr/build

foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Lazy right associative fold to a stream.

foldrSM :: (IsStream t, Monad m) => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a Source #

buildM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

augmentS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a Source #

augmentSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a Source #

Elimination

General Folds

foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b Source #

Lazy right associative fold.

foldr1 :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> m (Maybe a) Source #

foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b Source #

Lazy right fold with a monadic step function.

foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #

Right associative fold to an arbitrary transformer monad.

foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b Source #

Strict left associative fold.

foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> m b Source #

Like foldl' but with a monadic step function.

foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #

Lazy left fold to a stream.

foldlT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #

Lazy left fold to an arbitrary transformer monad.

foldlx' :: forall t m a b x. (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Note that the accumulator is always evaluated including the initial value.

foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #

Like foldx, but with a monadic step function.

fold :: (IsStream t, Monad m) => Fold m a b -> t m a -> m b Source #

Specialized Folds

drain :: (Monad m, IsStream t) => t m a -> m () Source #

drain = foldl' (\_ _ -> ()) ()
drain = mapM_ (\_ -> return ())

null :: (IsStream t, Monad m) => t m a -> m Bool Source #

head :: (IsStream t, Monad m) => t m a -> m (Maybe a) Source #

tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) Source #

init :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) Source #

elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool Source #

notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool Source #

all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool Source #

any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool Source #

last :: (IsStream t, Monad m) => t m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

minimum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) Source #

minimumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a) Source #

maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) Source #

maximumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a) Source #

findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int Source #

lookup :: (IsStream t, Monad m, Eq a) => a -> t m (a, b) -> m (Maybe b) Source #

findM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> m (Maybe a) Source #

find :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m (Maybe a) Source #

(!!) :: (IsStream t, Monad m) => t m a -> Int -> m (Maybe a) Source #

Map and Fold

mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m () Source #

Apply a monadic action to each element of the stream and discard the output of the action.

Conversions

toList :: (IsStream t, Monad m) => t m a -> m [a] Source #

hoist :: (IsStream t, Monad m, Monad n) => (forall x. m x -> n x) -> t m a -> t n a Source #

Transformation

By folding (scans)

scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b Source #

scanlx' :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Filtering

filter :: IsStream t => (a -> Bool) -> t m a -> t m a Source #

take :: IsStream t => Int -> t m a -> t m a Source #

takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a Source #

drop :: IsStream t => Int -> t m a -> t m a Source #

dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a Source #

Mapping

map :: IsStream t => (a -> b) -> t m a -> t m b Source #

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b Source #

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

Inserting

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

insertBy :: IsStream t => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

Deleting

deleteBy :: IsStream t => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Reordering

reverse :: IsStream t => t m a -> t m a Source #

Map and Filter

mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b Source #

Zipping

zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a pure zipping function.

Since: 0.1.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a monadic zipping function.

Since: 0.1.0

Merging

mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Nesting

concatMapBy :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function. For example, the concat function could be serial, parallel, async, ahead or any other zip or merge function.

Since: 0.7.0

concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b Source #

bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b Source #

concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

See concatPairsWith for documentation.

apWith :: IsStream t => (t m b -> t m b -> t m b) -> t m (a -> b) -> t m a -> t m b Source #

apSerial :: IsStream t => t m (a -> b) -> t m a -> t m b Source #

apSerialDiscardFst :: IsStream t => t m a -> t m b -> t m b Source #

apSerialDiscardSnd :: IsStream t => t m a -> t m b -> t m a Source #

Transformation comprehensions

the :: (Eq a, IsStream t, Monad m) => t m a -> m (Maybe a) Source #

Semigroup Style Composition

serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> import Streamly.Prelude (serial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ stream1 `serial` stream2
[1,2,3,4]

This operation can be used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Utilities

consMStream :: Monad m => m a -> Stream m a -> Stream m a Source #

withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a Source #

mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #

We can define cyclic structures using let:

>>> let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)

The function fix defined as:

fix f = let x = f x in x

ensures that the argument of a function and its output refer to the same lazy value x i.e. the same location in memory. Thus x can be defined in terms of itself, creating structures with cyclic references.

>>> import Data.Function (fix)
>>> f ~(a, b) = ([1, b], head a)
>>> fix f
([1,1],1)

mfix is essentially the same as fix but for monadic values.

Using mfix for streams we can construct a stream in which each element of the stream is defined in a cyclic fashion. The argument of the function being fixed represents the current element of the stream which is being returned by the stream monad. Thus, we can use the argument to construct itself.

In the following example, the argument action of the function f represents the tuple (x,y) returned by it in a given iteration. We define the first element of the tuple in terms of the second.

import Streamly.Internal.Data.Stream.IsStream as Stream
import System.IO.Unsafe (unsafeInterleaveIO)

main = do
    Stream.mapM_ print $ Stream.mfix f

    where

    f action = do
        let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
        x <- Stream.fromListM [incr 1 action, incr 2 action]
        y <- Stream.fromList [4,5]
        return (x, y)

Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.

Note that the function f must be lazy in its argument, that's why we use unsafeInterleaveIO on action because IO monad is strict.

Pre-release

Deprecated

type Streaming = IsStream Source #

Deprecated: Please use IsStream instead.

Same as IsStream.

Since: 0.1.0

once :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use fromEffect instead.

Same as fromEffect

Since: 0.2.0