streamly-0.8.0: Dataflow programming and declarative concurrency
Copyright(c) 2019 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Unfold

Description

To run the examples in this module:

>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Internal.Data.Unfold as Unfold

Unfolds and Streams

An Unfold type is the same as the direct style Stream type except that it uses an inject function to determine the initial state of the stream based on an input. A stream is a special case of Unfold when the static input is unit or Void.

This allows an important optimization to occur in several cases, making the Unfold a more efficient abstraction. Consider the concatMap and unfoldMany operations, the latter is more efficient. concatMap generates a new stream object from each element in the stream by applying the supplied function to the element, the stream object includes the "step" function as well as the initial "state" of the stream. Since the stream is generated dynamically the compiler does not know the step function or the state type statically at compile time, therefore, it cannot inline it. On the other hand in case of unfoldMany the compiler has visibility into the unfold's state generation function, therefore, the compiler knows all the types statically and it can inline the inject as well as the step functions, generating efficient code. Essentially, the stream is not opaque to the consumer in case of unfolds, the consumer knows how to generate the stream from a seed using a known "inject" and "step" functions.

A Stream is like a data object whereas unfold is like a function. Being function like, an Unfold is an instance of Category and Arrow type classes.

Unfolds and Folds

Streams forcing a closed control flow loop can be categorized under two types, unfolds and folds, both of these are duals of each other.

Unfold streams are really generators of a sequence of elements, we can also call them pull style streams. These are lazy producers of streams. On each evaluation the producer generates the next element. A consumer can therefore pull elements from the stream whenever it wants to. A stream consumer can multiplex pull streams by pulling elements from the chosen streams, therefore, pull streams allow merging or multiplexing. On the other hand, with this representation we cannot split or demultiplex a stream. So really these are stream sources that can be generated from a seed and can be merged or zipped into a single stream.

The dual of Unfolds are Folds. Folds can also be called as push style streams or reducers. These are strict consumers of streams. We keep pushing elements to a fold and we can extract the result at any point. A driver can choose which fold to push to and can also push the same element to multiple folds. Therefore, folds allow splitting or demultiplexing a stream. On the other hand, we cannot merge streams using this representation. So really these are stream consumers that reduce the stream to a single value, these consumers can be composed such that a stream can be split over multiple consumers.

Performance:

Composing a tree or graph of computations with unfolds can be much more efficient compared to composing with the Monad instance. The reason is that unfolds allow the compiler to statically know the state and optimize it using stream fusion whereas it is not possible with the monad bind because the state is determined dynamically.

Synopsis

Unfold Type

data Step s a Source #

A stream is a succession of Steps. A Yield produces a single value and the next state of the stream. Stop indicates there are no more values in the stream.

Constructors

Yield a s 
Skip s 
Stop 

Instances

Instances details
Functor (Step s) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamD.Step

Methods

fmap :: (a -> b) -> Step s a -> Step s b #

(<$) :: a -> Step s b -> Step s a #

data Unfold m a b Source #

An Unfold m a b is a generator of a stream of values of type b from a seed of type a in Monad m.

Since: 0.7.0

Instances

Instances details
Functor m => Functor (Unfold m a) Source #

Maps a function on the output of the unfold (the type b).

Instance details

Defined in Streamly.Internal.Data.Unfold.Type

Methods

fmap :: (a0 -> b) -> Unfold m a a0 -> Unfold m a b #

(<$) :: a0 -> Unfold m a b -> Unfold m a a0 #

Folding

fold :: Monad m => Fold m b c -> Unfold m a b -> a -> m c Source #

Compose an Unfold and a Fold. Given an Unfold m a b and a Fold m b c, returns a monadic action a -> m c representing the application of the fold on the unfolded stream.

>>> Unfold.fold Fold.sum Unfold.fromList [1..100]
5050

Pre-release

Unfolds

Basic Constructors

mkUnfoldM :: (s -> m (Step s b)) -> (a -> m s) -> Unfold m a b Source #

Make an unfold from step and inject functions.

Pre-release

mkUnfoldrM :: Applicative m => (a -> m (Step a b)) -> Unfold m a b Source #

Make an unfold from a step function.

See also: unfoldrM

Pre-release

unfoldrM :: Applicative m => (a -> m (Maybe (b, a))) -> Unfold m a b Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends.

Since: 0.8.0

unfoldr :: Applicative m => (a -> Maybe (b, a)) -> Unfold m a b Source #

Like unfoldrM but uses a pure step function.

>>> :{
 f [] = Nothing
 f (x:xs) = Just (x, xs)
:}
>>> Unfold.fold Fold.toList (Unfold.unfoldr f) [1,2,3]
[1,2,3]

Since: 0.8.0

functionM :: Applicative m => (a -> m b) -> Unfold m a b Source #

Lift a monadic function into an unfold. The unfold generates a singleton stream.

Since: 0.8.0

function :: Applicative m => (a -> b) -> Unfold m a b Source #

Lift a pure function into an unfold. The unfold generates a singleton stream.

function f = functionM $ return . f

Since: 0.8.0

identity :: Applicative m => Unfold m a a Source #

Identity unfold. The unfold generates a singleton stream having the input as the only element.

identity = function Prelude.id

Pre-release

nilM :: Monad m => (a -> m c) -> Unfold m a b Source #

Lift a monadic function into an unfold generating a nil stream with a side effect.

consM :: Monad m => (a -> m b) -> Unfold m a b -> Unfold m a b Source #

Prepend a monadic single element generator function to an Unfold. The same seed is used in the action as well as the unfold.

Pre-release

From Values

fromEffect :: Applicative m => m b -> Unfold m a b Source #

The unfold discards its input and generates a function stream using the supplied monadic action.

Pre-release

fromPure :: Applicative m => b -> Unfold m a b Source #

Discards the unfold input and always returns the argument of fromPure.

fromPure = fromEffect . pure

Pre-release

Generators

Generate a monadic stream from a seed.

repeatM :: Monad m => Unfold m (m a) a Source #

Generates an infinite stream repeating the seed.

Since: 0.8.0

replicateM :: Monad m => Int -> Unfold m (m a) a Source #

Generates a stream replicating the seed n times.

Since: 0.8.0

fromIndicesM :: Monad m => (Int -> m a) -> Unfold m Int a Source #

fromIndicesM gen generates an infinite stream of values using gen starting from the seed.

fromIndicesM f = Unfold.mapM f $ Unfold.enumerateFrom 0

Pre-release

iterateM :: Monad m => (a -> m a) -> Unfold m (m a) a Source #

Generates an infinite stream starting with the given seed and applying the given function repeatedly.

Since: 0.8.0

Enumerations

Enumerate Num

enumerateFromStepNum :: (Monad m, Num a) => a -> Unfold m a a Source #

Generate an infinite stream starting from a starting value with increments of the given stride. The implementation is numerically stable for floating point values.

Note enumerateFromStepIntegral is faster for integrals.

Pre-release

numFrom :: (Monad m, Num a) => Unfold m a a Source #

numFrom = enumerateFromStepNum 1

Pre-release

Enumerate Integral

enumerateFromStepIntegral :: (Integral a, Monad m) => Unfold m (a, a) a Source #

Can be used to enumerate unbounded integrals. This does not check for overflow or underflow for bounded integrals.

Enumerate Fractional

Use Num enumerations for fractional or floating point number enumerations.

enumerateFromToFractional :: (Monad m, Fractional a, Ord a) => a -> Unfold m a a Source #

Internal

enumerateFromToFractional to = takeWhile (<= to + 1 / 2) $ enumerateFromStepNum 1

From Containers

fromList :: Monad m => Unfold m [a] a Source #

Convert a list of pure values to a Stream

Since: 0.8.0

fromListM :: Monad m => Unfold m [m a] a Source #

Convert a list of monadic values to a Stream

Since: 0.8.0

fromStream :: (IsStream t, Monad m) => Unfold m (t m a) a Source #

Convert a stream into an Unfold. Note that a stream converted to an Unfold may not be as efficient as an Unfold in some situations.

Since: 0.8.0

fromSVar :: MonadAsync m => Unfold m (SVar t m a) a Source #

Internal

fromProducer :: MonadAsync m => Unfold m (SVar t m a) a Source #

Internal

Combinators

Mapping on Input

lmap :: (a -> c) -> Unfold m c b -> Unfold m a b Source #

Map a function on the input argument of the Unfold.

>>> u = Unfold.lmap (fmap (+1)) Unfold.fromList
>>> Unfold.fold Fold.toList u [1..5]
[2,3,4,5,6]
lmap f = Unfold.many (Unfold.function f)

Since: 0.8.0

lmapM :: Monad m => (a -> m c) -> Unfold m c b -> Unfold m a b Source #

Map an action on the input argument of the Unfold.

lmapM f = Unfold.many (Unfold.functionM f)

Since: 0.8.0

supply :: a -> Unfold m a b -> Unfold m Void b Source #

Supply the seed to an unfold closing the input end of the unfold.

supply a = Unfold.lmap (Prelude.const a)

Pre-release

supplyFirst :: a -> Unfold m (a, b) c -> Unfold m b c Source #

Supply the first component of the tuple to an unfold that accepts a tuple as a seed resulting in a fold that accepts the second component of the tuple as a seed.

supplyFirst a = Unfold.lmap (a, )

Pre-release

supplySecond :: b -> Unfold m (a, b) c -> Unfold m a c Source #

Supply the second component of the tuple to an unfold that accepts a tuple as a seed resulting in a fold that accepts the first component of the tuple as a seed.

supplySecond b = Unfold.lmap (, b)

Pre-release

discardFirst :: Unfold m a b -> Unfold m (c, a) b Source #

Convert an Unfold into an unfold accepting a tuple as an argument, using the argument of the original fold as the second element of tuple and discarding the first element of the tuple.

discardFirst = Unfold.lmap snd

Pre-release

discardSecond :: Unfold m a b -> Unfold m (a, c) b Source #

Convert an Unfold into an unfold accepting a tuple as an argument, using the argument of the original fold as the first element of tuple and discarding the second element of the tuple.

discardSecond = Unfold.lmap fst

Pre-release

swap :: Unfold m (a, c) b -> Unfold m (c, a) b Source #

Convert an Unfold that accepts a tuple as an argument into an unfold that accepts a tuple with elements swapped.

swap = Unfold.lmap Tuple.swap

Pre-release

Mapping on Output

map :: Functor m => (b -> c) -> Unfold m a b -> Unfold m a c Source #

Map a function on the output of the unfold (the type b).

Pre-release

mapM :: Monad m => (b -> m c) -> Unfold m a b -> Unfold m a c Source #

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

Since: 0.8.0

mapMWithInput :: Monad m => (a -> b -> m c) -> Unfold m a b -> Unfold m a c Source #

Filtering

takeWhileM :: Monad m => (b -> m Bool) -> Unfold m a b -> Unfold m a b Source #

Same as takeWhile but with a monadic predicate.

Since: 0.8.0

takeWhile :: Monad m => (b -> Bool) -> Unfold m a b -> Unfold m a b Source #

End the stream generated by the Unfold as soon as the predicate fails on an element.

Since: 0.8.0

take :: Monad m => Int -> Unfold m a b -> Unfold m a b Source #

>>> u = Unfold.take 2 Unfold.fromList
>>> Unfold.fold Fold.toList u [1..100]
[1,2]

Since: 0.8.0

filter :: Monad m => (b -> Bool) -> Unfold m a b -> Unfold m a b Source #

Include only those elements that pass a predicate.

Since: 0.8.0

filterM :: Monad m => (b -> m Bool) -> Unfold m a b -> Unfold m a b Source #

Same as filter but with a monadic predicate.

Since: 0.8.0

drop :: Monad m => Int -> Unfold m a b -> Unfold m a b Source #

drop n unf drops n elements from the stream generated by unf.

Since: 0.8.0

dropWhile :: Monad m => (b -> Bool) -> Unfold m a b -> Unfold m a b Source #

Similar to dropWhileM but with a pure condition function.

Since: 0.8.0

dropWhileM :: Monad m => (b -> m Bool) -> Unfold m a b -> Unfold m a b Source #

dropWhileM f unf drops elements from the stream generated by unf while the condition holds true. The condition function f is monadic in nature.

Since: 0.8.0

Zipping

zipWithM :: Monad m => (b -> c -> m d) -> Unfold m a b -> Unfold m a c -> Unfold m a d Source #

Distribute the input to two unfolds and then zip the outputs to a single stream using a monadic zip function.

Stops as soon as any of the unfolds stops.

Pre-release

zipWith :: Monad m => (b -> c -> d) -> Unfold m a b -> Unfold m a c -> Unfold m a d Source #

Like zipWithM but with a pure zip function.

>>> square = fmap (\x -> x * x) Unfold.fromList
>>> cube = fmap (\x -> x * x * x) Unfold.fromList
>>> u = Unfold.zipWith (,) square cube
>>> Unfold.fold Fold.toList u [1..5]
[(1,1),(4,8),(9,27),(16,64),(25,125)]
zipWith f = zipWithM (\a b -> return $ f a b)

Since: 0.8.0

Cross product

crossWithM :: Monad m => (b -> c -> m d) -> Unfold m a b -> Unfold m a c -> Unfold m a d Source #

Create a cross product (vector product or cartesian product) of the output streams of two unfolds using a monadic combining function.

Pre-release

crossWith :: Monad m => (b -> c -> d) -> Unfold m a b -> Unfold m a c -> Unfold m a d Source #

Like crossWithM but uses a pure combining function.

crossWith f = crossWithM (\b c -> return $ f b c)
>>> u1 = Unfold.lmap fst Unfold.fromList
>>> u2 = Unfold.lmap snd Unfold.fromList
>>> u = Unfold.crossWith (,) u1 u2
>>> Unfold.fold Fold.toList u ([1,2,3], [4,5,6])
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

Since: 0.8.0

cross :: Monad m => Unfold m a b -> Unfold m a c -> Unfold m a (b, c) Source #

See crossWith.

cross = crossWith (,)

To cross the streams from a tuple we can write:

crossProduct :: Monad m => Unfold m a b -> Unfold m c d -> Unfold m (a, c) (b, d)
crossProduct u1 u2 = cross (lmap fst u1) (lmap snd u2)

Pre-release

apply :: Monad m => Unfold m a (b -> c) -> Unfold m a b -> Unfold m a c Source #

Nesting

data ConcatState s1 s2 Source #

Constructors

ConcatOuter s1 
ConcatInner s1 s2 

many :: Monad m => Unfold m a b -> Unfold m b c -> Unfold m a c Source #

Apply the second unfold to each output element of the first unfold and flatten the output in a single stream.

Since: 0.8.0

concatMapM :: Monad m => (b -> m (Unfold m a c)) -> Unfold m a b -> Unfold m a c Source #

Map an unfold generating action to each element of an unfold and flatten the results into a single stream.

bind :: Monad m => Unfold m a b -> (b -> Unfold m a c) -> Unfold m a c infixl 1 Source #

Resource Management

gbracket_ Source #

Arguments

:: Monad m 
=> (a -> m c)

before

-> (forall s. m s -> m (Either e s))

try (exception handling)

-> (c -> m d)

after, on normal stop

-> Unfold m (c, e) b

on exception

-> Unfold m c b

unfold to run

-> Unfold m a b 

Like gbracket but with following differences:

  • alloc action a -> m c runs with async exceptions enabled
  • cleanup action c -> m d won't run if the stream is garbage collected after partial evaluation.
  • does not require a MonadAsync constraint.

Inhibits stream fusion

Pre-release

gbracket Source #

Arguments

:: (MonadIO m, MonadBaseControl IO m) 
=> (a -> m c)

before

-> (forall s. m s -> m (Either e s))

try (exception handling)

-> (c -> m d)

after, on normal stop, or GC

-> Unfold m (c, e) b

on exception

-> Unfold m c b

unfold to run

-> Unfold m a b 

Run the alloc action a -> m c with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output c as input to Unfold m c b to generate an output stream. When unfolding use the supplied try operation forall s. m s -> m (Either e s) to catch synchronous exceptions. If an exception occurs run the exception handling unfold Unfold m (c, e) b.

The cleanup action c -> m d, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation. See bracket for the semantics of the cleanup action.

gbracket can express all other exception handling combinators.

Inhibits stream fusion

Pre-release

before :: (a -> m c) -> Unfold m a b -> Unfold m a b Source #

Run a side effect a -> m c on the input a before unfolding it using Unfold m a b.

before f = lmapM (\a -> f a >> return a)

Pre-release

after :: (MonadIO m, MonadBaseControl IO m) => (a -> m c) -> Unfold m a b -> Unfold m a b Source #

Unfold the input a using Unfold m a b, run an action on a whenever the unfold stops normally, or if it is garbage collected after a partial lazy evaluation.

The semantics of the action a -> m c are similar to the cleanup action semantics in bracket.

See also after_

Pre-release

after_ :: Monad m => (a -> m c) -> Unfold m a b -> Unfold m a b Source #

Like after with following differences:

  • action a -> m c won't run if the stream is garbage collected after partial evaluation.
  • Monad m does not require any other constraints.

Pre-release

finally :: (MonadAsync m, MonadCatch m) => (a -> m c) -> Unfold m a b -> Unfold m a b Source #

Unfold the input a using Unfold m a b, run an action on a whenever the unfold stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.

The semantics of the action a -> m c are similar to the cleanup action semantics in bracket.

finally release = bracket return release

See also finally_

Inhibits stream fusion

Pre-release

finally_ :: MonadCatch m => (a -> m c) -> Unfold m a b -> Unfold m a b Source #

Like finally with following differences:

  • action a -> m c won't run if the stream is garbage collected after partial evaluation.
  • does not require a MonadAsync constraint.

Inhibits stream fusion

Pre-release

bracket :: (MonadAsync m, MonadCatch m) => (a -> m c) -> (c -> m d) -> Unfold m c b -> Unfold m a b Source #

Run the alloc action a -> m c with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output c as input to Unfold m c b to generate an output stream.

c is usually a resource under the state of monad m, e.g. a file handle, that requires a cleanup after use. The cleanup action c -> m d, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation.

bracket only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.

See also: bracket_, gbracket

Inhibits stream fusion

Pre-release

bracket_ :: MonadCatch m => (a -> m c) -> (c -> m d) -> Unfold m c b -> Unfold m a b Source #

Like bracket but with following differences:

  • alloc action a -> m c runs with async exceptions enabled
  • cleanup action c -> m d won't run if the stream is garbage collected after partial evaluation.
  • does not require a MonadAsync constraint.

Inhibits stream fusion

Pre-release

Exceptions

onException :: MonadCatch m => (a -> m c) -> Unfold m a b -> Unfold m a b Source #

Unfold the input a using Unfold m a b, run the action a -> m c on a if the unfold aborts due to an exception.

Inhibits stream fusion

Pre-release

handle :: (MonadCatch m, Exception e) => Unfold m e b -> Unfold m a b -> Unfold m a b Source #

When unfolding Unfold m a b if an exception e occurs, unfold e using Unfold m e b.

Inhibits stream fusion

Pre-release