streamly-0.7.1: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.StreamK

Contents

Description

Continuation passing style (CPS) stream implementation. The symbol K below denotes a function as well as a Kontinuation.

import qualified Streamly.Internal.Data.Stream.StreamK as K
Synopsis

A class for streams

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0

Methods

toStream :: t m a -> Stream m a Source #

fromStream :: Stream m a -> t m a Source #

consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use parallely to construct infinite streams)

Since: 0.2.0

(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ serially  $ delay |: delay |: delay |: nil
drain $ parallely $ delay |: delay |: delay |: nil

Concurrent (do not use parallely to construct infinite streams)

Since: 0.2.0

Instances
IsStream Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

toStream :: Stream m a -> Stream m a Source #

fromStream :: Stream m a -> Stream m a Source #

consM :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

(|:) :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: ZipAsyncM m a -> Stream m a Source #

fromStream :: Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: ZipSerialM m a -> Stream m a Source #

fromStream :: Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: WSerialT m a -> Stream m a Source #

fromStream :: Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: SerialT m a -> Stream m a Source #

fromStream :: Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

toStream :: ParallelT m a -> Stream m a Source #

fromStream :: Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: WAsyncT m a -> Stream m a Source #

fromStream :: Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: AsyncT m a -> Stream m a Source #

fromStream :: Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: AheadT m a -> Stream m a Source #

fromStream :: Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0

The stream type

newtype Stream m a Source #

The type Stream m a represents a monadic stream of values of type a constructed using actions in monad m. It uses stop, singleton and yield continuations equivalent to the following direct style type:

data Stream m a = Stop | Singleton a | Yield a (Stream m a)

To facilitate parallel composition we maintain a local state in an SVar that is shared across and is used for synchronization of the streams being composed.

The singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of pure for Applicative and Monad, and in lift for MonadTrans.

XXX remove the Stream type parameter from State as it is always constant. We can remove it from SVar as well

Constructors

MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) 
Instances
MonadTrans Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

lift :: Monad m => m a -> Stream m a #

IsStream Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

toStream :: Stream m a -> Stream m a Source #

fromStream :: Stream m a -> Stream m a Source #

consM :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

(|:) :: MonadAsync m => m a -> Stream m a -> Stream m a Source #

Monad m => Monad (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(>>=) :: Stream m a -> (a -> Stream m b) -> Stream m b #

(>>) :: Stream m a -> Stream m b -> Stream m b #

return :: a -> Stream m a #

fail :: String -> Stream m a #

Monad m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b #

(<$) :: a -> Stream m b -> Stream m a #

Monad m => Applicative (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

pure :: a -> Stream m a #

(<*>) :: Stream m (a -> b) -> Stream m a -> Stream m b #

liftA2 :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c #

(*>) :: Stream m a -> Stream m b -> Stream m b #

(<*) :: Stream m a -> Stream m b -> Stream m a #

Semigroup (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(<>) :: Stream m a -> Stream m a -> Stream m a #

sconcat :: NonEmpty (Stream m a) -> Stream m a #

stimes :: Integral b => b -> Stream m a -> Stream m a #

Monoid (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

mempty :: Stream m a #

mappend :: Stream m a -> Stream m a -> Stream m a #

mconcat :: [Stream m a] -> Stream m a #

Construction Primitives

mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Build a stream from an SVar, a stop continuation, a singleton stream continuation and a yield continuation.

nil :: IsStream t => t m a Source #

An empty stream.

> toList nil
[]

Since: 0.1.0

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil"))
"nil"
[]

Internal

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

Elimination Primitives

foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

Transformation Primitives

unShare :: IsStream t => t m a -> t m a Source #

Detach a stream from an SVar

Deconstruction

uncons :: (IsStream t, Monad m) => t m a -> m (Maybe (a, t m a)) Source #

Generation

Unfolds

unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a Source #

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Specialized Generation

repeat :: IsStream t => a -> t m a Source #

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

repeatM = fix . cons
repeatM = cycle1 . yield

Generate an infinite stream by repeating a monadic value.

Internal

replicate :: IsStream t => Int -> a -> t m a Source #

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

fromIndices :: IsStream t => (Int -> a) -> t m a Source #

fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

iterate :: IsStream t => (a -> a) -> a -> t m a Source #

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

Conversions

yield :: IsStream t => a -> t m a Source #

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

fromFoldable = foldr cons nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromList :: IsStream t => [a] -> t m a Source #

fromStreamK :: IsStream t => Stream m a -> t m a Source #

foldr/build

foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Lazy right associative fold to a stream.

foldrSM :: (IsStream t, Monad m) => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a Source #

buildM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

augmentS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a Source #

augmentSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a Source #

Elimination

General Folds

foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b Source #

Lazy right associative fold.

foldr1 :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> m (Maybe a) Source #

foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b Source #

Lazy right fold with a monadic step function.

foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #

Right associative fold to an arbitrary transformer monad.

foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b Source #

Strict left associative fold.

foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b Source #

Like foldl' but with a monadic step function.

foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #

Lazy left fold to a stream.

foldlT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #

Lazy left fold to an arbitrary transformer monad.

foldlx' :: forall t m a b x. (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Note that the accumulator is always evaluated including the initial value.

foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #

Like foldx, but with a monadic step function.

Specialized Folds

drain :: (Monad m, IsStream t) => t m a -> m () Source #

drain = foldl' (\_ _ -> ()) ()
drain = mapM_ (\_ -> return ())

null :: (IsStream t, Monad m) => t m a -> m Bool Source #

head :: (IsStream t, Monad m) => t m a -> m (Maybe a) Source #

tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) Source #

init :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) Source #

elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool Source #

notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool Source #

all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool Source #

any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool Source #

last :: (IsStream t, Monad m) => t m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

minimum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) Source #

minimumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a) Source #

maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) Source #

maximumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a) Source #

findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int Source #

lookup :: (IsStream t, Monad m, Eq a) => a -> t m (a, b) -> m (Maybe b) Source #

findM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> m (Maybe a) Source #

find :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m (Maybe a) Source #

(!!) :: (IsStream t, Monad m) => t m a -> Int -> m (Maybe a) Source #

Map and Fold

mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m () Source #

Apply a monadic action to each element of the stream and discard the output of the action.

Conversions

toList :: (IsStream t, Monad m) => t m a -> m [a] Source #

hoist :: (IsStream t, Monad m, Monad n) => (forall x. m x -> n x) -> t m a -> t n a Source #

Transformation

By folding (scans)

scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b Source #

scanlx' :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Filtering

filter :: IsStream t => (a -> Bool) -> t m a -> t m a Source #

take :: IsStream t => Int -> t m a -> t m a Source #

takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a Source #

drop :: IsStream t => Int -> t m a -> t m a Source #

dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a Source #

Mapping

map :: IsStream t => (a -> b) -> t m a -> t m b Source #

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b Source #

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

Inserting

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

insertBy :: IsStream t => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

Deleting

deleteBy :: IsStream t => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Reordering

reverse :: IsStream t => t m a -> t m a Source #

Map and Filter

mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b Source #

Zipping

zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a pure zipping function.

Since: 0.1.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a monadic zipping function.

Since: 0.1.0

Merging

mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Nesting

concatMapBy :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function. For example, the concat function could be serial, parallel, async, ahead or any other zip or merge function.

Since: 0.7.0

concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b Source #

bindWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> t m a -> (a -> t m b) -> t m b Source #

Transformation comprehensions

the :: (Eq a, IsStream t, Monad m) => t m a -> m (Maybe a) Source #

Semigroup Style Composition

serial :: IsStream t => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of SerialT. Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

Since: 0.2.0

Utilities

consMStream :: Monad m => m a -> Stream m a -> Stream m a Source #

withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a Source #

mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #

Iterate a lazy function f of the shape `m a -> t m a` until it gets fully defined i.e. becomes independent of its argument action, then return the resulting value of the function (`t m a`).

It can be used to construct a stream that uses a cyclic definition. For example:

import Streamly.Internal.Prelude as S
import System.IO.Unsafe (unsafeInterleaveIO)

main = do
    S.mapM_ print $ S.mfix $ x -> do
      a <- S.fromList [1,2]
      b <- S.fromListM [return 3, unsafeInterleaveIO (fmap fst x)]
      return (a, b)

Note that the function f must be lazy in its argument, that's why we use unsafeInterleaveIO because IO monad is strict.

Deprecated

type Streaming = IsStream Source #

Deprecated: Please use IsStream instead.

Same as IsStream.

Since: 0.1.0

once :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use yieldM instead.

Same as yieldM

Since: 0.2.0