streamly-0.5.1: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerharendra.kumar@gmail.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Prelude

Contents

Description

This module is designed to be imported qualified:

import qualified Streamly.Prelude as S

Functions with the suffix M are general functions that work on monadic arguments. The corresponding functions without the suffix M work on pure arguments and can in general be derived from their monadic versions but are provided for convenience and for consistency with other pure APIs in the base package.

Functions having a MonadAsync constraint work concurrently when used with appropriate stream type combinator. Please be careful to not use parallely with infinite streams.

Deconstruction and folds accept a SerialT type instead of a polymorphic type to ensure that streams always have a concrete monomorphic type by default, reducing type errors. In case you want to use any other type of stream you can use one of the type combinators provided in the Streamly module to convert the stream type.

Synopsis

Construction

Primitives to construct a stream.

nil :: IsStream t => t m a Source #

An empty stream.

> toList nil
[]

Since: streamly-0.1.0

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: streamly-0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: streamly-0.1.1

consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use parallely to construct infinite streams)

Since: streamly-0.2.0

(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
runStream $ serially  $ delay |: delay |: delay |: nil
runStream $ parallely $ delay |: delay |: delay |: nil

Concurrent (do not use parallely to construct infinite streams)

Since: streamly-0.2.0

Deconstruction

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

Since: streamly-0.1.0

Generation

Unfolds

unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #

Build a stream by unfolding a pure step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

let f b =
        if b > 3
        then Nothing
        else Just (b, b + 1)
in toList $ unfoldr f 0
[0,1,2,3]

Since: streamly-0.1.0

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

let f b =
        if b > 3
        then return Nothing
        else print b >> return (Just (b, b + 1))
in runStream $ unfoldrM f 0
 0
 1
 2
 3

When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.

(asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0)
    & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()

Concurrent

Since: 0.1.0

Specialized Generation

Generate a monadic stream from a seed.

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

Generate a stream by performing a monadic action n times.

runStream $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
runStream $ asyncly  $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)

Concurrent

Since: streamly-0.1.1

repeat :: IsStream t => a -> t m a Source #

Generate an infinite stream by repeating a pure value.

Since: streamly-0.4.0

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

Generate a stream by repeatedly executing a monadic action forever.

runStream $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
runStream $ asyncly  $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)

Concurrent, infinite (do not use with parallely)

Since: streamly-0.2.0

iterate :: IsStream t => (a -> a) -> a -> t m a Source #

Iterate a pure function from a seed value, streaming the results forever.

Since: streamly-0.1.2

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> a -> t m a Source #

Iterate a monadic function from a seed value, streaming the results forever.

When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.

runStream $ serially $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0

runStream $ asyncly  $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0

Concurrent

Since: streamly-0.1.2

Conversions

Transform an input structure into a stream.

yield :: IsStream t => a -> t m a Source #

Create a singleton stream from a pure value. In monadic streams, pure or return can be used in place of yield, however, in Zip applicative streams pure is equivalent to repeat.

Since: streamly-0.4.0

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

Create a singleton stream from a monadic action. Same as m `consM` nil but more efficient.

> toList $ yieldM getLine
hello
["hello"]

Since: streamly-0.4.0

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

Construct a stream from a list containing pure values. This can be more efficient than fromFoldable for lists as it can fuse the list.

Since: streamly-0.4.0

fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #

Construct a stream from a list containing monadic actions. This can be more efficient than fromFoldableM especially for serial streams as it can fuse the list.

Since: streamly-0.4.0

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

Construct a stream from a Foldable containing pure values.

Since: streamly-0.2.0

fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #

Construct a stream from a Foldable containing monadic actions.

runStream $ serially $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1)
runStream $ asyncly  $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1)

Concurrent (do not use with parallely on infinite containers)

Since: streamly-0.3.0

fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #

Read lines from an IO Handle into a stream of Strings.

Since: streamly-0.1.0

Elimination

General Folds

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Lazy right associative fold. For example, to fold a stream into a list:

>> runIdentity $ foldr (:) [] (serially $ fromFoldable [1,2,3])
[1,2,3]

Since: streamly-0.1.0

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Right fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: streamly-0.5.0

foldrM :: Monad m => (a -> b -> m b) -> b -> SerialT m a -> m b Source #

Lazy right fold with a monadic step function. For example, to fold a stream into a list:

>> runIdentity $ foldrM (\x xs -> return (x : xs)) [] (serially $ fromFoldable [1,2,3])
[1,2,3]

Since: streamly-0.2.0

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Strict left associative fold.

Since: streamly-0.2.0

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: streamly-0.5.0

foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b Source #

Like foldl' but with a monadic step function.

Since: streamly-0.2.0

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: streamly-0.2.0

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Like foldx, but with a monadic step function.

Since: streamly-0.2.0

Specialized Folds

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

Since: streamly-0.1.0

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the first element of the stream, if any.

Since: streamly-0.1.1

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

Since: streamly-0.1.1

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

Since: streamly-0.5.0

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

Since: streamly-0.1.1

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

Since: streamly-0.1.0

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Gives the first index of an element in the stream, which equals the given.

Since: streamly-0.5.0

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

Since: streamly-0.1.0

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

Looks the given key up, treating the given stream as an association list.

Since: streamly-0.5.0

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element of the stream satisfying the given predicate, if any.

Since: streamly-0.5.0

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Gives the index of the first stream element satisfying the given preficate.

Since: streamly-0.5.0

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

Since: streamly-0.1.0

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

Since: streamly-0.1.0

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

Since: streamly-0.5.0

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines wheter at least one element of a boolean stream is True.

Since: streamly-0.5.0

length :: Monad m => SerialT m a -> m Int Source #

Determine the length of the stream.

Since: streamly-0.1.0

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream.

Since: streamly-0.1.0

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream.

Since: streamly-0.1.0

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers

Since: streamly-0.1.0

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers

Since: streamly-0.1.1

Map and Fold

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

Apply a monadic action to each element of the stream and discard the output of the action.

Since: streamly-0.1.0

Conversions

Transform a stream into an output structure of another type.

toList :: Monad m => SerialT m a -> m [a] Source #

Convert a stream into a list in the underlying monad.

Since: streamly-0.1.0

toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #

Write a stream of Strings to an IO Handle.

Since: streamly-0.1.0

Transformation

Mapping

map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b Source #

Same as fmap.

Since: streamly-0.4.0

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

Replace each element of the stream with the result of a monadic action applied on the element.

runStream $ S.replicateM 10 (return 1)
          & (serially . S.mapM (\x -> threadDelay 1000000 >> print x))

runStream $ S.replicateM 10 (return 1)
          & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x))

Concurrent (do not use with parallely on infinite streams)

Since: streamly-0.1.0

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

Reduce a stream of monadic actions to a stream of the output of those actions.

runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
          & (serially . S.sequence)

runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
          & (asyncly . S.sequence)

Concurrent (do not use with parallely on infinite streams)

Since: streamly-0.1.0

Scanning

Scan is a transformation by continuously folding the result with the next element of the stream.

scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Strict left scan. Like foldl', but returns the folded value at each step, generating a stream of all intermediate fold results. The first element of the stream is the user supplied initial value, and the last element of the stream is the same as the result of foldl'.

Since: streamly-0.2.0

scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b Source #

Like scanl' but with a monadic step function.

Since: streamly-0.4.0

scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Strict left scan with an extraction function. Like scanl', but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: streamly-0.2.0

Filtering

filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Include only those elements that pass a predicate.

Since: streamly-0.1.0

filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as filter but with a monadic predicate.

Since: streamly-0.4.0

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: streamly-0.1.0

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: streamly-0.1.0

takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as takeWhile but with a monadic predicate.

Since: streamly-0.4.0

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: streamly-0.1.0

dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

Since: streamly-0.1.0

dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as dropWhile but with a monadic predicate.

Since: streamly-0.4.0

Inserting

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Generate a stream by performing the monadic action inbetween all elements of the given stream.

Since: streamly-0.5.0

Reordering

reverse :: IsStream t => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite.

Since: streamly-0.1.1

Indices

findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int Source #

Finds all the indices of elements satisfying the given predicate.

Since: streamly-0.5.0

elemIndices :: (IsStream t, Eq a) => a -> t m a -> t m Int Source #

Finds the index of all elements in the stream which are equal to the given.

Since: streamly-0.5.0

Map and Filter

mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b Source #

Map a Maybe returning function to a stream, filter out the Nothing elements, and return a stream of values extracted from Just.

Since: streamly-0.3.0

mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #

Like mapMaybe but maps a monadic function.

Concurrent (do not use with parallely on infinite streams)

Since: streamly-0.3.0

Zipping

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a pure zipping function.

Since: streamly-0.1.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a monadic zipping function.

Since: streamly-0.4.0

zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Zip two streams concurrently (i.e. both the elements being zipped are generated concurrently) using a pure zipping function.

Since: streamly-0.1.0

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Zip two streams asyncly (i.e. both the elements being zipped are generated concurrently) using a monadic zipping function.

Since: streamly-0.4.0

Deprecated

once :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use yieldM instead.

Same as yieldM

Since: streamly-0.2.0

each :: (IsStream t, Foldable f) => f a -> t m a Source #

Deprecated: Please use fromFoldable instead.

Same as fromFoldable.

Since: streamly-0.1.0

scan :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Deprecated: Please use scanx instead.

Since: streamly-0.1.1

foldl :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Deprecated: Please use foldx instead.

Since: streamly-0.1.0

foldlM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Deprecated: Please use foldxM instead.

Since: streamly-0.1.0