streamly-core-0.2.2: Streaming, parsers, arrays, serialization and more
Copyright(c) 2019 Composewell Technologies
(c) 2013 Gabriel Gonzalez
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Fold

Description

See Streamly.Data.Fold for an overview and Streamly.Internal.Data.Fold.Type for design notes.

Synopsis

Imports

>>> :m
>>> :set -XFlexibleContexts
>>> import Control.Monad (void)
>>> import qualified Data.Foldable as Foldable
>>> import Data.Function ((&))
>>> import Data.Functor.Identity (Identity, runIdentity)
>>> import Data.IORef (newIORef, readIORef, writeIORef)
>>> import Data.Maybe (fromJust, isJust)
>>> import Data.Monoid (Endo(..), Last(..), Sum(..))
>>> import Streamly.Data.Array (Array)
>>> import Streamly.Data.Fold (Fold, Tee(..))
>>> import Streamly.Data.Stream (Stream)
>>> import qualified Streamly.Data.Array as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.MutArray as MutArray
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.Data.Unfold as Unfold

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.Data.Fold as Fold

Step Type

data Step s b Source #

Represents the result of the step of a Fold. Partial returns an intermediate state of the fold, the fold step can be called again with the state or the driver can use extract on the state to get the result out. Done returns the final result and the fold cannot be driven further.

Pre-release

Constructors

Partial !s 
Done !b 

Instances

Instances details
Bifunctor Step Source #

first maps over the fold state and second maps over the fold result.

Instance details

Defined in Streamly.Internal.Data.Fold.Step

Methods

bimap :: (a -> b) -> (c -> d) -> Step a c -> Step b d #

first :: (a -> b) -> Step a c -> Step b c #

second :: (b -> c) -> Step a b -> Step a c #

Functor (Step s) Source #

fmap maps over Done.

fmap = second
Instance details

Defined in Streamly.Internal.Data.Fold.Step

Methods

fmap :: (a -> b) -> Step s a -> Step s b #

(<$) :: a -> Step s b -> Step s a #

mapMStep :: Applicative m => (a -> m b) -> Step s a -> m (Step s b) Source #

Map a monadic function over the result b in Step s b.

Internal

chainStepM :: Applicative m => (s1 -> m s2) -> (a -> m (Step s2 b)) -> Step s1 a -> m (Step s2 b) Source #

If Partial then map the state, if Done then call the next step.

Fold Type

data Fold m a b Source #

The type Fold m a b represents a consumer of an input stream of values of type a and returning a final value of type b in Monad m. The constructor of a fold is Fold step initial extract final.

The fold uses an internal state of type s. The initial value of the state s is created by initial. This function is called once and only once before the fold starts consuming input. Any resource allocation can be done in this function.

The step function is called on each input, it consumes an input and returns the next intermediate state (see Step) or the final result b if the fold terminates.

If the fold is used as a scan, the extract function is used by the scan driver to map the current state s of the fold to the fold result. Thus extract can be called multiple times. In some folds, where scanning does not make sense, this function is left unimplemented; such folds cannot be used as scans.

Before a fold terminates, final is called once and only once (unless the fold terminated in initial itself). Any resources allocated by initial can be released in final. In folds that do not require any cleanup extract and final are typically the same.

When implementing fold combinators, care should be taken to cleanup any state of the argument folds held by the fold by calling the respective final at all exit points of the fold. Also, final should not be called more than once. Note that if a fold terminates by Done constructor, there is no state to cleanup.

NOTE: The constructor is not yet released, smart constructors are provided to create folds.

Constructors

forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b) (s -> m b)

Fold step initial extract final

Instances

Instances details
Monad m => Applicative (Fold m a) Source #

Applicative form of splitWith. Split the input serially over two folds. Note that this fuses but performance degrades quadratically with respect to the number of compositions. It should be good to use for less than 8 compositions.

Instance details

Defined in Streamly.Internal.Data.Fold.Type

Methods

pure :: a0 -> Fold m a a0 #

(<*>) :: Fold m a (a0 -> b) -> Fold m a a0 -> Fold m a b #

liftA2 :: (a0 -> b -> c) -> Fold m a a0 -> Fold m a b -> Fold m a c #

(*>) :: Fold m a a0 -> Fold m a b -> Fold m a b #

(<*) :: Fold m a a0 -> Fold m a b -> Fold m a a0 #

Functor m => Functor (Fold m a) Source #

Maps a function on the output of the fold (the type b).

Instance details

Defined in Streamly.Internal.Data.Fold.Type

Methods

fmap :: (a0 -> b) -> Fold m a a0 -> Fold m a b #

(<$) :: a0 -> Fold m a b -> Fold m a a0 #

Constructors

foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b Source #

Make a fold from a left fold style pure step function and initial value of the accumulator.

If your Fold returns only Partial (i.e. never returns a Done) then you can use foldl'* constructors.

A fold with an extract function can be expressed using fmap:

mkfoldlx :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b
mkfoldlx step initial extract = fmap extract (foldl' step initial)

foldlM' :: Monad m => (b -> a -> m b) -> m b -> Fold m a b Source #

Make a fold from a left fold style monadic step function and initial value of the accumulator.

A fold with an extract function can be expressed using rmapM:

mkFoldlxM :: Functor m => (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
mkFoldlxM step initial extract = rmapM extract (foldlM' step initial)

foldl1' :: Monad m => (a -> a -> a) -> Fold m a (Maybe a) Source #

Make a strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Pre-release

foldlM1' :: Monad m => (a -> a -> m a) -> Fold m a (Maybe a) Source #

Like 'foldl1'' but with a monadic step function.

Pre-release

foldt' :: Monad m => (s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b Source #

Make a terminating fold using a pure step function, a pure initial state and a pure state extraction function.

Pre-release

foldtM' :: (s -> a -> m (Step s b)) -> m (Step s b) -> (s -> m b) -> Fold m a b Source #

Make a terminating fold with an effectful step function and initial state, and a state extraction function.

>>> foldtM' = Fold.Fold

We can just use Fold but it is provided for completeness.

Pre-release

foldr' :: Monad m => (a -> b -> b) -> b -> Fold m a b Source #

Make a fold using a right fold style step function and a terminal value. It performs a strict right fold via a left fold using function composition. Note that a strict right fold can only be useful for constructing strict structures in memory. For reductions this will be very inefficient.

Definitions:

>>> foldr' f z = fmap (flip appEndo z) $ Fold.foldMap (Endo . f)
>>> foldr' f z = fmap ($ z) $ Fold.foldl' (\g x -> g . f x) id

Example:

>>> Stream.fold (Fold.foldr' (:) []) $ Stream.enumerateFromTo 1 5
[1,2,3,4,5]

foldrM' :: Monad m => (a -> b -> m b) -> m b -> Fold m a b Source #

Like foldr' but with a monadic step function.

Example:

>>> toList = Fold.foldrM' (\a xs -> return $ a : xs) (return [])

See also: foldrM

Pre-release

Folds

fromPure :: Applicative m => b -> Fold m a b Source #

Make a fold that yields the supplied value without consuming any further input.

Pre-release

fromEffect :: Applicative m => m b -> Fold m a b Source #

Make a fold that yields the result of the supplied effectful action without consuming any further input.

Pre-release

fromRefold :: Refold m c a b -> c -> Fold m a b Source #

Make a fold from a consumer.

Internal

drain :: Monad m => Fold m a () Source #

A fold that drains all its input, running the effects and discarding the results.

>>> drain = Fold.drainMapM (const (return ()))
>>> drain = Fold.foldl' (\_ _ -> ()) ()

toList :: Monad m => Fold m a [a] Source #

Folds the input stream to a list.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.

>>> toList = Fold.foldr' (:) []

toStreamK :: Monad m => Fold m a (StreamK n a) Source #

A fold that buffers its input to a pure stream.

>>> toStreamK = foldr StreamK.cons StreamK.nil
>>> toStreamK = fmap StreamK.reverse Fold.toStreamKRev

Internal

toStreamKRev :: Monad m => Fold m a (StreamK n a) Source #

Buffers the input stream to a pure stream in the reverse order of the input.

>>> toStreamKRev = Foldable.foldl' (flip StreamK.cons) StreamK.nil

This is more efficient than toStreamK. toStreamK has exactly the same performance as reversing the stream after toStreamKRev.

Pre-release

lengthGeneric :: (Monad m, Num b) => Fold m a b Source #

Like length, except with a more general Num return value

Definition:

>>> lengthGeneric = fmap getSum $ Fold.foldMap (Sum . const  1)
>>> lengthGeneric = Fold.foldl' (\n _ -> n + 1) 0

Pre-release

length :: Monad m => Fold m a Int Source #

Determine the length of the input stream.

Definition:

>>> length = Fold.lengthGeneric
>>> length = fmap getSum $ Fold.foldMap (Sum . const  1)

Combinators

Mapping output

rmapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c Source #

Map a monadic function on the output of a fold.

Mapping Input

lmap :: (a -> b) -> Fold m b r -> Fold m a r Source #

lmap f fold maps the function f on the input of the fold.

Definition:

>>> lmap = Fold.lmapM return

Example:

>>> sumSquared = Fold.lmap (\x -> x * x) Fold.sum
>>> Stream.fold sumSquared (Stream.enumerateFromTo 1 100)
338350

lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r Source #

lmapM f fold maps the monadic function f on the input of the fold.

postscan :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #

Postscan the input of a Fold to change it in a stateful manner using another Fold.

postscan scanner collector

Pre-release

Filtering

catMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b Source #

Modify a fold to receive a Maybe input, the Just values are unwrapped and sent to the original fold, Nothing values are discarded.

>>> catMaybes = Fold.mapMaybe id
>>> catMaybes = Fold.filter isJust . Fold.lmap fromJust

scanMaybe :: Monad m => Fold m a (Maybe b) -> Fold m b c -> Fold m a c Source #

Use a Maybe returning fold as a filtering scan.

>>> scanMaybe p f = Fold.postscan p (Fold.catMaybes f)

Pre-release

filter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r Source #

Include only those elements that pass a predicate.

>>> Stream.fold (Fold.filter (> 5) Fold.sum) $ Stream.fromList [1..10]
40
>>> filter p = Fold.scanMaybe (Fold.filtering p)
>>> filter p = Fold.filterM (return . p)
>>> filter p = Fold.mapMaybe (\x -> if p x then Just x else Nothing)

filtering :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #

A scanning fold for filtering elements based on a predicate.

filterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r Source #

Like filter but with a monadic predicate.

>>> f p x = p x >>= \r -> return $ if r then Just x else Nothing
>>> filterM p = Fold.mapMaybeM (f p)

catLefts :: Monad m => Fold m a c -> Fold m (Either a b) c Source #

Discard Rights and unwrap Lefts in an Either stream.

Pre-release

catRights :: Monad m => Fold m b c -> Fold m (Either a b) c Source #

Discard Lefts and unwrap Rights in an Either stream.

Pre-release

catEithers :: Fold m a b -> Fold m (Either a a) b Source #

Remove the either wrapper and flatten both lefts and as well as rights in the output stream.

Definition:

>>> catEithers = Fold.lmap (either id id)

Pre-release

Trimming

take :: Monad m => Int -> Fold m a b -> Fold m a b Source #

Take at most n input elements and fold them using the supplied fold. A negative count is treated as 0.

>>> Stream.fold (Fold.take 2 Fold.toList) $ Stream.fromList [1..10]
[1,2]

taking :: Monad m => Int -> Fold m a (Maybe a) Source #

takeEndBy_ :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b Source #

Like takeEndBy but drops the element on which the predicate succeeds.

Example:

>>> input = Stream.fromList "hello\nthere\n"
>>> line = Fold.takeEndBy_ (== '\n') Fold.toList
>>> Stream.fold line input
"hello"
>>> Stream.fold Fold.toList $ Stream.foldMany line input
["hello","there"]

takeEndBy :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b Source #

Take the input, stop when the predicate succeeds taking the succeeding element as well.

Example:

>>> input = Stream.fromList "hello\nthere\n"
>>> line = Fold.takeEndBy (== '\n') Fold.toList
>>> Stream.fold line input
"hello\n"
>>> Stream.fold Fold.toList $ Stream.foldMany line input
["hello\n","there\n"]

dropping :: Monad m => Int -> Fold m a (Maybe a) Source #

Sequential application

splitWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #

Sequential fold application. Apply two folds sequentially to an input stream. The input is provided to the first fold, when it is done - the remaining input is provided to the second fold. When the second fold is done or if the input stream is over, the outputs of the two folds are combined using the supplied function.

Example:

>>> header = Fold.take 8 Fold.toList
>>> line = Fold.takeEndBy (== '\n') Fold.toList
>>> f = Fold.splitWith (,) header line
>>> Stream.fold f $ Stream.fromList "header: hello\n"
("header: ","hello\n")

Note: This is dual to appending streams using append.

Note: this implementation allows for stream fusion but has quadratic time complexity, because each composition adds a new branch that each subsequent fold's input element has to traverse, therefore, it cannot scale to a large number of compositions. After around 100 compositions the performance starts dipping rapidly compared to a CPS style implementation.

For larger number of compositions you can convert the fold to a parser and use ParserK.

Time: O(n^2) where n is the number of compositions.

split_ :: Monad m => Fold m x a -> Fold m x b -> Fold m x b Source #

Same as applicative *>. Run two folds serially one after the other discarding the result of the first.

This was written in the hope that it might be faster than implementing it using splitWith, but the current benchmarks show that it has the same performance. So do not expose it unless some benchmark shows benefit.

Repeated Application (Splitting)

data ManyState s1 s2 Source #

many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #

Collect zero or more applications of a fold. many first second applies the first fold repeatedly on the input stream and accumulates it's results using the second fold.

>>> two = Fold.take 2 Fold.toList
>>> twos = Fold.many two Fold.toList
>>> Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]

Stops when second fold stops.

See also: concatMap, foldMany

manyPost :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #

Like many, but the "first" fold emits an output at the end even if no input is received.

Internal

See also: concatMap, foldMany

groupsOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c Source #

groupsOf n split collect repeatedly applies the split fold to chunks of n items in the input stream and supplies the result to the collect fold.

Definition:

>>> groupsOf n split = Fold.many (Fold.take n split)

Example:

>>> twos = Fold.groupsOf 2 Fold.toList Fold.toList
>>> Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]

Stops when collect stops.

refoldMany :: Monad m => Fold m a b -> Refold m x b c -> Refold m x a c Source #

Like many but uses a Refold for collecting.

refoldMany1 :: Monad m => Refold m x a b -> Fold m b c -> Refold m x a c Source #

Like many but uses a Refold for splitting.

Internal

Nested Application

concatMap :: Monad m => (b -> Fold m a c) -> Fold m a b -> Fold m a c Source #

Map a Fold returning function on the result of a Fold and run the returned fold. This operation can be used to express data dependencies between fold operations.

Let's say the first element in the stream is a count of the following elements that we have to add, then:

>>> import Data.Maybe (fromJust)
>>> count = fmap fromJust Fold.one
>>> total n = Fold.take n Fold.sum
>>> Stream.fold (Fold.concatMap total count) $ Stream.fromList [10,9..1]
45

This does not fuse completely, see refold for a fusible alternative.

Time: O(n^2) where n is the number of compositions.

See also: foldIterateM, refold

duplicate :: Monad m => Fold m a b -> Fold m a (Fold m a b) Source #

duplicate provides the ability to run a fold in parts. The duplicated fold consumes the input and returns the same fold as output instead of returning the final result, the returned fold can be run later to consume more input.

duplicate essentially appends a stream to the fold without finishing the fold. Compare with snoc which appends a singleton value to the fold.

Pre-release

refold :: Monad m => Refold m b a c -> Fold m a b -> Fold m a c Source #

Extract the output of a fold and refold it using a Refold.

A fusible alternative to concatMap.

Internal

Parallel Distribution

teeWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #

teeWith k f1 f2 distributes its input to both f1 and f2 until both of them terminate and combines their output using k.

Definition:

>>> teeWith k f1 f2 = fmap (uncurry k) (Fold.tee f1 f2)

Example:

>>> avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>> Stream.fold avg $ Stream.fromList [1.0..100.0]
50.5

For applicative composition using this combinator see Streamly.Data.Fold.Tee.

See also: Streamly.Data.Fold.Tee

Note that nested applications of teeWith do not fuse.

teeWithFst :: Monad m => (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d Source #

Like teeWith but terminates as soon as the first fold terminates.

Pre-release

teeWithMin :: Monad m => (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d Source #

Like teeWith but terminates as soon as any one of the two folds terminates.

Pre-release

Parallel Alternative

shortest :: Monad m => Fold m x a -> Fold m x b -> Fold m x (Either a b) Source #

Shortest alternative. Apply both folds in parallel but choose the result from the one which consumed least input i.e. take the shortest succeeding fold.

If both the folds finish at the same time or if the result is extracted before any of the folds could finish then the left one is taken.

Pre-release

longest :: Monad m => Fold m x a -> Fold m x b -> Fold m x (Either a b) Source #

Longest alternative. Apply both folds in parallel but choose the result from the one which consumed more input i.e. take the longest succeeding fold.

If both the folds finish at the same time or if the result is extracted before any of the folds could finish then the left one is taken.

Pre-release

Running A Fold

extractM :: Monad m => Fold m a b -> m b Source #

Extract the accumulated result of the fold.

Definition:

>>> extractM = Fold.drive Stream.nil

Example:

>>> Fold.extractM Fold.toList
[]

Pre-release

reduce :: Monad m => Fold m a b -> m (Fold m a b) Source #

Evaluate the initialization effect of a fold. If we are building the fold by chaining lazy actions in fold init this would reduce the actions to a strict accumulator value.

Pre-release

snoc :: Monad m => Fold m a b -> a -> m (Fold m a b) Source #

Append a singleton value to the fold, in other words run a single step of the fold.

Example:

>>> import qualified Data.Foldable as Foldable
>>> Foldable.foldlM Fold.snoc Fold.toList [1..3] >>= Fold.drive Stream.nil
[1,2,3]

Pre-release

addOne :: Monad m => a -> Fold m a b -> m (Fold m a b) Source #

Append a singleton value to the fold.

See examples under addStream.

Pre-release

snocM :: Monad m => Fold m a b -> m a -> m (Fold m a b) Source #

Append a singleton value to the fold in other words run a single step of the fold.

Definition:

>>> snocM f = Fold.reduce . Fold.snoclM f

Pre-release

snocl :: Monad m => Fold m a b -> a -> Fold m a b Source #

Append a singleton value to the fold lazily, in other words run a single step of the fold.

Definition:

>>> snocl f = Fold.snoclM f . return

Example:

>>> import qualified Data.Foldable as Foldable
>>> Fold.extractM $ Foldable.foldl Fold.snocl Fold.toList [1..3]
[1,2,3]

Pre-release

snoclM :: Monad m => Fold m a b -> m a -> Fold m a b Source #

Append an effect to the fold lazily, in other words run a single step of the fold.

Pre-release

close :: Monad m => Fold m a b -> Fold m a b Source #

Close a fold so that it does not accept any more input.

isClosed :: Monad m => Fold m a b -> m Bool Source #

Check if the fold has terminated and can take no more input.

Pre-release

Transforming inner monad

morphInner :: (forall x. m x -> n x) -> Fold m a b -> Fold n a b Source #

Change the underlying monad of a fold. Also known as hoist.

Pre-release

generalizeInner :: Monad m => Fold Identity a b -> Fold m a b Source #

Adapt a pure fold to any monad.

>>> generalizeInner = Fold.morphInner (return . runIdentity)

Pre-release

Deprecated

foldr :: Monad m => (a -> b -> b) -> b -> Fold m a b Source #

Deprecated: Please use foldr' instead.

serialWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #

Deprecated: Please use "splitWith" instead

newtype Tee m a b Source #

Tee is a newtype wrapper over the Fold type providing distributing Applicative, Semigroup, Monoid, Num, Floating and Fractional instances.

The input received by the composed Tee is replicated and distributed to the constituent folds of the Tee.

For example, to compute the average of numbers in a stream without going through the stream twice:

>>> avg = (/) <$> (Tee Fold.sum) <*> (Tee $ fmap fromIntegral Fold.length)
>>> Stream.fold (unTee avg) $ Stream.fromList [1.0..100.0]
50.5

Similarly, the Semigroup and Monoid instances of Tee distribute the input to both the folds and combine the outputs using Monoid or Semigroup instances of the output types:

>>> import Data.Monoid (Sum(..))
>>> t = Tee Fold.one <> Tee Fold.latest
>>> Stream.fold (unTee t) (fmap Sum $ Stream.enumerateFromTo 1.0 100.0)
Just (Sum {getSum = 101.0})

The Num, Floating, and Fractional instances work in the same way.

Constructors

Tee 

Fields

Instances

Instances details
Monad m => Applicative (Tee m a) Source #

<*> distributes the input to both the argument Tees and combines their outputs using function application.

Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

pure :: a0 -> Tee m a a0 #

(<*>) :: Tee m a (a0 -> b) -> Tee m a a0 -> Tee m a b #

liftA2 :: (a0 -> b -> c) -> Tee m a a0 -> Tee m a b -> Tee m a c #

(*>) :: Tee m a a0 -> Tee m a b -> Tee m a b #

(<*) :: Tee m a a0 -> Tee m a b -> Tee m a a0 #

Functor m => Functor (Tee m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

fmap :: (a0 -> b) -> Tee m a a0 -> Tee m a b #

(<$) :: a0 -> Tee m a b -> Tee m a a0 #

(Monoid b, Monad m) => Monoid (Tee m a b) Source #

<> distributes the input to both the argument Tees and combines their outputs using the Monoid instance of the output type.

Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

mempty :: Tee m a b #

mappend :: Tee m a b -> Tee m a b -> Tee m a b #

mconcat :: [Tee m a b] -> Tee m a b #

(Semigroup b, Monad m) => Semigroup (Tee m a b) Source #

<> distributes the input to both the argument Tees and combines their outputs using the Semigroup instance of the output type.

Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

(<>) :: Tee m a b -> Tee m a b -> Tee m a b #

sconcat :: NonEmpty (Tee m a b) -> Tee m a b #

stimes :: Integral b0 => b0 -> Tee m a b -> Tee m a b #

(Monad m, Floating b) => Floating (Tee m a b) Source #

Binary Floating operations distribute the input to both the argument Tees and combine their outputs using the Floating instance of the output type.

Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

pi :: Tee m a b #

exp :: Tee m a b -> Tee m a b #

log :: Tee m a b -> Tee m a b #

sqrt :: Tee m a b -> Tee m a b #

(**) :: Tee m a b -> Tee m a b -> Tee m a b #

logBase :: Tee m a b -> Tee m a b -> Tee m a b #

sin :: Tee m a b -> Tee m a b #

cos :: Tee m a b -> Tee m a b #

tan :: Tee m a b -> Tee m a b #

asin :: Tee m a b -> Tee m a b #

acos :: Tee m a b -> Tee m a b #

atan :: Tee m a b -> Tee m a b #

sinh :: Tee m a b -> Tee m a b #

cosh :: Tee m a b -> Tee m a b #

tanh :: Tee m a b -> Tee m a b #

asinh :: Tee m a b -> Tee m a b #

acosh :: Tee m a b -> Tee m a b #

atanh :: Tee m a b -> Tee m a b #

log1p :: Tee m a b -> Tee m a b #

expm1 :: Tee m a b -> Tee m a b #

log1pexp :: Tee m a b -> Tee m a b #

log1mexp :: Tee m a b -> Tee m a b #

(Monad m, Num b) => Num (Tee m a b) Source #

Binary Num operations distribute the input to both the argument Tees and combine their outputs using the Num instance of the output type.

Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

(+) :: Tee m a b -> Tee m a b -> Tee m a b #

(-) :: Tee m a b -> Tee m a b -> Tee m a b #

(*) :: Tee m a b -> Tee m a b -> Tee m a b #

negate :: Tee m a b -> Tee m a b #

abs :: Tee m a b -> Tee m a b #

signum :: Tee m a b -> Tee m a b #

fromInteger :: Integer -> Tee m a b #

(Monad m, Fractional b) => Fractional (Tee m a b) Source #

Binary Fractional operations distribute the input to both the argument Tees and combine their outputs using the Fractional instance of the output type.

Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

(/) :: Tee m a b -> Tee m a b -> Tee m a b #

recip :: Tee m a b -> Tee m a b #

fromRational :: Rational -> Tee m a b #

toFold :: Tee m a b -> Fold m a b Source #

Deprecated: Please use unTee instead.

Mappers

Monadic functions useful with mapM/lmapM on folds or streams.

tracing :: Monad m => (a -> m b) -> a -> m a Source #

Apply a monadic function on the input and return the input.

>>> Stream.fold (Fold.lmapM (Fold.tracing print) Fold.drain) $ (Stream.enumerateFromTo (1 :: Int) 2)
1
2

Pre-release

trace :: Monad m => (a -> m b) -> Fold m a r -> Fold m a r Source #

Apply a monadic function to each element flowing through and discard the results.

>>> Stream.fold (Fold.trace print Fold.drain) $ (Stream.enumerateFromTo (1 :: Int) 2)
1
2
>>> trace f = Fold.lmapM (Fold.tracing f)

Pre-release

Folds

Accumulators

Semigroups and Monoids

sconcat :: (Monad m, Semigroup a) => a -> Fold m a a Source #

Semigroup concat. Append the elements of an input stream to a provided starting value.

Definition:

>>> sconcat = Fold.foldl' (<>)
>>> semigroups = fmap Data.Monoid.Sum $ Stream.enumerateFromTo 1 10
>>> Stream.fold (Fold.sconcat 10) semigroups
Sum {getSum = 65}

mconcat :: (Monad m, Monoid a) => Fold m a a Source #

Monoid concat. Fold an input stream consisting of monoidal elements using mappend and mempty.

Definition:

>>> mconcat = Fold.sconcat mempty
>>> monoids = fmap Data.Monoid.Sum $ Stream.enumerateFromTo 1 10
>>> Stream.fold Fold.mconcat monoids
Sum {getSum = 55}

foldMap :: (Monad m, Monoid b) => (a -> b) -> Fold m a b Source #

Definition:

>>> foldMap f = Fold.lmap f Fold.mconcat

Make a fold from a pure function that folds the output of the function using mappend and mempty.

>>> sum = Fold.foldMap Data.Monoid.Sum
>>> Stream.fold sum $ Stream.enumerateFromTo 1 10
Sum {getSum = 55}

foldMapM :: (Monad m, Monoid b) => (a -> m b) -> Fold m a b Source #

Definition:

>>> foldMapM f = Fold.lmapM f Fold.mconcat

Make a fold from a monadic function that folds the output of the function using mappend and mempty.

>>> sum = Fold.foldMapM (return . Data.Monoid.Sum)
>>> Stream.fold sum $ Stream.enumerateFromTo 1 10
Sum {getSum = 55}

Reducers

drainMapM :: Monad m => (a -> m b) -> Fold m a () Source #

Definitions:

>>> drainMapM f = Fold.lmapM f Fold.drain
>>> drainMapM f = Fold.foldMapM (void . f)

Drain all input after passing it through a monadic function. This is the dual of mapM_ on stream producers.

the :: (Monad m, Eq a) => Fold m a (Maybe a) Source #

Terminates with Nothing as soon as it finds an element different than the previous one, returns the element if the entire input consists of the same element.

mean :: (Monad m, Fractional a) => Fold m a a Source #

Compute a numerically stable arithmetic mean of all elements in the input stream.

rollingHash :: (Monad m, Enum a) => Fold m a Int64 Source #

Compute an Int sized polynomial rolling hash of a stream.

>>> rollingHash = Fold.rollingHashWithSalt Fold.defaultSalt

defaultSalt :: Int64 Source #

A default salt used in the implementation of rollingHash.

rollingHashWithSalt :: (Monad m, Enum a) => Int64 -> Fold m a Int64 Source #

Compute an Int sized polynomial rolling hash

H = salt * k ^ n + c1 * k ^ (n - 1) + c2 * k ^ (n - 2) + ... + cn * k ^ 0

Where c1, c2, cn are the elements in the input stream and k is a constant.

This hash is often used in Rabin-Karp string search algorithm.

See https://en.wikipedia.org/wiki/Rolling_hash

rollingHashFirstN :: (Monad m, Enum a) => Int -> Fold m a Int64 Source #

Compute an Int sized polynomial rolling hash of the first n elements of a stream.

>>> rollingHashFirstN n = Fold.take n Fold.rollingHash

Pre-release

Saturating Reducers

product terminates if it becomes 0. Other folds can theoretically saturate on bounded types, and therefore terminate, however, they will run forever on unbounded types like Integer/Double.

sum :: (Monad m, Num a) => Fold m a a Source #

Determine the sum of all elements of a stream of numbers. Returns additive identity (0) when the stream is empty. Note that this is not numerically stable for floating point numbers.

>>> sum = Fold.cumulative Fold.windowSum

Same as following but numerically stable:

>>> sum = Fold.foldl' (+) 0
>>> sum = fmap Data.Monoid.getSum $ Fold.foldMap Data.Monoid.Sum

product :: (Monad m, Num a, Eq a) => Fold m a a Source #

Determine the product of all elements of a stream of numbers. Returns multiplicative identity (1) when the stream is empty. The fold terminates when it encounters (0) in its input.

Same as the following but terminates on multiplication by 0:

>>> product = fmap Data.Monoid.getProduct $ Fold.foldMap Data.Monoid.Product

maximumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

maximum :: (Monad m, Ord a) => Fold m a (Maybe a) Source #

Determine the maximum element in a stream.

Definitions:

>>> maximum = Fold.maximumBy compare
>>> maximum = Fold.foldl1' max

Same as the following but without a default maximum. The Max Monoid uses the minBound as the default maximum:

>>> maximum = fmap Data.Semigroup.getMax $ Fold.foldMap Data.Semigroup.Max

minimumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) Source #

Computes the minimum element with respect to the given comparison function

minimum :: (Monad m, Ord a) => Fold m a (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

Definitions:

>>> minimum = Fold.minimumBy compare
>>> minimum = Fold.foldl1' min

Same as the following but without a default minimum. The Min Monoid uses the maxBound as the default maximum:

>>> maximum = fmap Data.Semigroup.getMin $ Fold.foldMap Data.Semigroup.Min

Collectors

Avoid using these folds in scalable or performance critical applications, they buffer all the input in GC memory which can be detrimental to performance if the input is large.

toListRev :: Monad m => Fold m a [a] Source #

Buffers the input stream to a list in the reverse order of the input.

Definition:

>>> toListRev = Fold.foldl' (flip (:)) []

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

This is more efficient than toList. toList is exactly the same as reversing the list after toListRev.

toStream :: (Monad m, Monad n) => Fold m a (Stream n a) Source #

A fold that buffers its input to a pure stream.

Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.

>>> toStream = fmap Stream.fromList Fold.toList

Pre-release

toStreamRev :: (Monad m, Monad n) => Fold m a (Stream n a) Source #

Buffers the input stream to a pure stream in the reverse order of the input.

>>> toStreamRev = fmap Stream.fromList Fold.toListRev

Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.

Pre-release

topBy :: (MonadIO m, Unbox a) => (a -> a -> Ordering) -> Int -> Fold m a (MutArray a) Source #

Get the top n elements using the supplied comparison function.

To get bottom n elements instead:

>>> bottomBy cmp = Fold.topBy (flip cmp)

Example:

>>> stream = Stream.fromList [2::Int,7,9,3,1,5,6,11,17]
>>> Stream.fold (Fold.topBy compare 3) stream >>= MutArray.toList
[17,11,9]

Pre-release

top :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (MutArray a) Source #

Fold the input stream to top n elements.

Definition:

>>> top = Fold.topBy compare
>>> stream = Stream.fromList [2::Int,7,9,3,1,5,6,11,17]
>>> Stream.fold (Fold.top 3) stream >>= MutArray.toList
[17,11,9]

Pre-release

bottomBy :: (MonadIO m, Unbox a) => (a -> a -> Ordering) -> Int -> Fold m a (MutArray a) Source #

Get the bottom most n elements using the supplied comparison function.

bottom :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (MutArray a) Source #

Fold the input stream to bottom n elements.

Definition:

>>> bottom = Fold.bottomBy compare
>>> stream = Stream.fromList [2::Int,7,9,3,1,5,6,11,17]
>>> Stream.fold (Fold.bottom 3) stream >>= MutArray.toList
[1,2,3]

Pre-release

Scanners

Stateful transformation of the elements. Useful in combination with the scanMaybe combinator. For scanners the result of the fold is usually a transformation of the current element rather than an aggregation of all elements till now.

latest :: Monad m => Fold m a (Maybe a) Source #

Returns the latest element of the input stream, if any.

>>> latest = Fold.foldl1' (\_ x -> x)
>>> latest = fmap getLast $ Fold.foldMap (Last . Just)

indexingWith :: Monad m => Int -> (Int -> Int) -> Fold m a (Maybe (Int, a)) Source #

Pair each element of a fold input with its index, starting from index 0.

indexing :: Monad m => Fold m a (Maybe (Int, a)) Source #

>>> indexing = Fold.indexingWith 0 (+ 1)

indexingRev :: Monad m => Int -> Fold m a (Maybe (Int, a)) Source #

>>> indexingRev n = Fold.indexingWith n (subtract 1)

rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Fold m a b Source #

Apply a function on every two successive elements of a stream. The first argument of the map function is the previous element and the second argument is the current element. When processing the very first element in the stream, the previous element is Nothing.

Pre-release

Filters

Useful in combination with the scanMaybe combinator.

deleteBy :: Monad m => (a -> a -> Bool) -> a -> Fold m a (Maybe a) Source #

Returns the latest element omitting the first occurrence that satisfies the given equality predicate.

Example:

>>> input = Stream.fromList [1,3,3,5]
>>> Stream.fold Fold.toList $ Stream.scanMaybe (Fold.deleteBy (==) 3) input
[1,3,5]

uniqBy :: Monad m => (a -> a -> Bool) -> Fold m a (Maybe a) Source #

Return the latest unique element using the supplied comparison function. Returns Nothing if the current element is same as the last element otherwise returns Just.

Example, strip duplicate path separators:

>>> input = Stream.fromList "//a//b"
>>> f x y = x == '/' && y == '/'
>>> Stream.fold Fold.toList $ Stream.scanMaybe (Fold.uniqBy f) input
"/a/b"

Space: O(1)

Pre-release

uniq :: (Monad m, Eq a) => Fold m a (Maybe a) Source #

See uniqBy.

Definition:

>>> uniq = Fold.uniqBy (==)

repeated :: Fold m a (Maybe a) Source #

Emit only repeated elements, once.

Unimplemented

findIndices :: Monad m => (a -> Bool) -> Fold m a (Maybe Int) Source #

Returns the index of the latest element if the element satisfies the given predicate.

elemIndices :: (Monad m, Eq a) => a -> Fold m a (Maybe Int) Source #

Returns the index of the latest element if the element matches the given value.

Definition:

>>> elemIndices a = Fold.findIndices (== a)

Singleton folds

Folds that terminate after consuming exactly one input element. All these can be implemented in terms of the maybe fold.

one :: Monad m => Fold m a (Maybe a) Source #

Take one element from the stream and stop.

Definition:

>>> one = Fold.maybe Just

This is similar to the stream uncons operation.

null :: Monad m => Fold m a Bool Source #

Consume one element, return True if successful else return False. In other words, test if the input is empty or not.

WARNING! It consumes one element if the stream is not empty. If that is not what you want please use the eof parser instead.

Definition:

>>> null = fmap isJust Fold.one

satisfy :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #

Consume a single element and return it if it passes the predicate else return Nothing.

Definition:

>>> satisfy f = Fold.maybe (\a -> if f a then Just a else Nothing)

Pre-release

maybe :: Monad m => (a -> Maybe b) -> Fold m a (Maybe b) Source #

Consume a single input and transform it using the supplied Maybe returning function.

Pre-release

Multi folds

Terminate after consuming one or more elements.

drainN :: Monad m => Int -> Fold m a () Source #

A fold that drains the first n elements of its input, running the effects and discarding the results.

Definition:

>>> drainN n = Fold.take n Fold.drain

Pre-release

indexGeneric :: (Integral i, Monad m) => i -> Fold m a (Maybe a) Source #

Like index, except with a more general Integral argument

Pre-release

index :: Monad m => Int -> Fold m a (Maybe a) Source #

Return the element at the given index.

Definition:

>>> index = Fold.indexGeneric

findM :: Monad m => (a -> m Bool) -> Fold m a (Maybe a) Source #

Returns the first element that satisfies the given predicate.

Pre-release

find :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #

Returns the first element that satisfies the given predicate.

lookup :: (Eq a, Monad m) => a -> Fold m (a, b) (Maybe b) Source #

In a stream of (key-value) pairs (a, b), return the value b of the first pair where the key equals the given value a.

Definition:

>>> lookup x = fmap snd <$> Fold.find ((== x) . fst)

findIndex :: Monad m => (a -> Bool) -> Fold m a (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

elemIndex :: (Eq a, Monad m) => a -> Fold m a (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

Definition:

>>> elemIndex a = Fold.findIndex (== a)

elem :: (Eq a, Monad m) => a -> Fold m a Bool Source #

Return True if the given element is present in the stream.

Definition:

>>> elem a = Fold.any (== a)

notElem :: (Eq a, Monad m) => a -> Fold m a Bool Source #

Returns True if the given element is not present in the stream.

Definition:

>>> notElem a = Fold.all (/= a)

all :: Monad m => (a -> Bool) -> Fold m a Bool Source #

Returns True if all elements of the input satisfy the predicate.

Definition:

>>> all p = Fold.lmap p Fold.and

Example:

>>> Stream.fold (Fold.all (== 0)) $ Stream.fromList [1,0,1]
False

any :: Monad m => (a -> Bool) -> Fold m a Bool Source #

Returns True if any element of the input satisfies the predicate.

Definition:

>>> any p = Fold.lmap p Fold.or

Example:

>>> Stream.fold (Fold.any (== 0)) $ Stream.fromList [1,0,1]
True

and :: Monad m => Fold m Bool Bool Source #

Returns True if all elements are True, False otherwise

Definition:

>>> and = Fold.all (== True)

or :: Monad m => Fold m Bool Bool Source #

Returns True if any element is True, False otherwise

Definition:

>>> or = Fold.any (== True)

Trimmers

Useful in combination with the scanMaybe combinator.

takingEndByM :: Monad m => (a -> m Bool) -> Fold m a (Maybe a) Source #

takingEndBy :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #

>>> takingEndBy p = Fold.takingEndByM (return . p)

takingEndByM_ :: Monad m => (a -> m Bool) -> Fold m a (Maybe a) Source #

takingEndBy_ :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #

>>> takingEndBy_ p = Fold.takingEndByM_ (return . p)

droppingWhileM :: Monad m => (a -> m Bool) -> Fold m a (Maybe a) Source #

droppingWhile :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #

>>> droppingWhile p = Fold.droppingWhileM (return . p)

prune :: (a -> Bool) -> Fold m a (Maybe a) Source #

Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.

> prune p = Stream.dropWhileAround p $ Stream.uniqBy (x y -> p x && p y)
> Stream.prune isSpace (Stream.fromList "  hello      world!   ")
"hello world!"

Space: O(1)

Unimplemented

Running A Fold

drive :: Monad m => Stream m a -> Fold m a b -> m b Source #

Drive a fold using the supplied Stream, reducing the resulting expression strictly at each step.

Definition:

>>> drive = flip Stream.fold

Example:

>>> Fold.drive (Stream.enumerateFromTo 1 100) Fold.sum
5050

Building Incrementally

addStream :: Monad m => Stream m a -> Fold m a b -> m (Fold m a b) Source #

Append a stream to a fold to build the fold accumulator incrementally. We can repeatedly call addStream on the same fold to continue building the fold and finally use drive to finish the fold and extract the result. Also see the addOne operation which is a singleton version of addStream.

Definitions:

>>> addStream stream = Fold.drive stream . Fold.duplicate

Example, build a list incrementally:

>>> :{
pure (Fold.toList :: Fold IO Int [Int])
    >>= Fold.addOne 1
    >>= Fold.addStream (Stream.enumerateFromTo 2 4)
    >>= Fold.drive Stream.nil
    >>= print
:}
[1,2,3,4]

This can be used as an O(n) list append compared to the O(n^2) ++ when used for incrementally building a list.

Example, build a stream incrementally:

>>> :{
pure (Fold.toStream :: Fold IO Int (Stream Identity Int))
    >>= Fold.addOne 1
    >>= Fold.addStream (Stream.enumerateFromTo 2 4)
    >>= Fold.drive Stream.nil
    >>= print
:}
fromList [1,2,3,4]

This can be used as an O(n) stream append compared to the O(n^2) <> when used for incrementally building a stream.

Example, build an array incrementally:

>>> :{
pure (Array.write :: Fold IO Int (Array Int))
    >>= Fold.addOne 1
    >>= Fold.addStream (Stream.enumerateFromTo 2 4)
    >>= Fold.drive Stream.nil
    >>= print
:}
fromList [1,2,3,4]

Example, build an array stream incrementally:

>>> :{
let f :: Fold IO Int (Stream Identity (Array Int))
    f = Fold.groupsOf 2 (Array.writeN 3) Fold.toStream
in pure f
    >>= Fold.addOne 1
    >>= Fold.addStream (Stream.enumerateFromTo 2 4)
    >>= Fold.drive Stream.nil
    >>= print
:}
fromList [fromList [1,2],fromList [3,4]]

Combinators

Utilities

with :: (Fold m (s, a) b -> Fold m a b) -> (((s, a) -> c) -> Fold m (s, a) b -> Fold m (s, a) b) -> ((s, a) -> c) -> Fold m a b -> Fold m a b Source #

Change the predicate function of a Fold from a -> b to accept an additional state input (s, a) -> b. Convenient to filter with an addiitonal index or time input.

>>> filterWithIndex = Fold.with Fold.indexed Fold.filter
filterWithAbsTime = with timestamped filter
filterWithRelTime = with timeIndexed filter

Pre-release

Mapping on Input

transform :: Monad m => Pipe m a b -> Fold m b c -> Fold m a c Source #

Apply a transformation on a Fold using a Pipe.

Pre-release

Sliding Window

slide2 :: Monad m => Fold m (a, Maybe a) b -> Fold m a b Source #

Provide a sliding window of length 2 elements.

See Streamly.Internal.Data.Fold.Window.

Scanning Input

scan :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #

Scan the input of a Fold to change it in a stateful manner using another Fold. The scan stops as soon as the fold terminates.

Pre-release

scanMany :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #

Scan the input of a Fold to change it in a stateful manner using another Fold. The scan restarts with a fresh state if the fold terminates.

Pre-release

indexed :: Monad m => Fold m (Int, a) b -> Fold m a b Source #

Pair each element of a fold input with its index, starting from index 0.

>>> indexed = Fold.scanMaybe Fold.indexing

Zipping Input

zipStreamWithM :: (a -> b -> m c) -> Stream m a -> Fold m c x -> Fold m b x Source #

Zip a stream with the input of a fold using the supplied function.

Unimplemented

zipStream :: Monad m => Stream m a -> Fold m (a, b) x -> Fold m b x Source #

Zip a stream with the input of a fold.

>>> zip = Fold.zipStreamWithM (curry return)

Unimplemented

Filtering Input

mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Fold m b r -> Fold m a r Source #

>>> mapMaybeM f = Fold.lmapM f . Fold.catMaybes

mapMaybe :: Monad m => (a -> Maybe b) -> Fold m b r -> Fold m a r Source #

mapMaybe f fold maps a Maybe returning function f on the input of the fold, filters out Nothing elements, and return the values extracted from Just.

>>> mapMaybe f = Fold.lmap f . Fold.catMaybes
>>> mapMaybe f = Fold.mapMaybeM (return . f)
>>> f x = if even x then Just x else Nothing
>>> fld = Fold.mapMaybe f Fold.toList
>>> Stream.fold fld (Stream.enumerateFromTo 1 10)
[2,4,6,8,10]

sampleFromthen :: Monad m => Int -> Int -> Fold m a b -> Fold m a b Source #

sampleFromthen offset stride samples the element at offset index and then every element at strides of stride.

Trimming

takeEndBySeq :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Fold m a b Source #

Continue taking the input until the input sequence matches the supplied sequence, taking the supplied sequence as well. If the pattern is empty this acts as an identity fold.

>>> s = Stream.fromList "hello there. How are you?"
>>> f = Fold.takeEndBySeq (Array.fromList "re") Fold.toList
>>> Stream.fold f s
"hello there"
>>> Stream.fold Fold.toList $ Stream.foldMany f s
["hello there",". How are"," you?"]

Pre-release

takeEndBySeq_ :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Fold m a b Source #

Like takeEndBySeq but discards the matched sequence.

Pre-release

Serial Append

splitAt :: Monad m => Int -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

splitAt n f1 f2 composes folds f1 and f2 such that first n elements of its input are consumed by fold f1 and the rest of the stream is consumed by fold f2.

>>> let splitAt_ n xs = Stream.fold (Fold.splitAt n Fold.toList Fold.toList) $ Stream.fromList xs
>>> splitAt_ 6 "Hello World!"
("Hello ","World!")
>>> splitAt_ (-1) [1,2,3]
([],[1,2,3])
>>> splitAt_ 0 [1,2,3]
([],[1,2,3])
>>> splitAt_ 1 [1,2,3]
([1],[2,3])
>>> splitAt_ 3 [1,2,3]
([1,2,3],[])
>>> splitAt_ 4 [1,2,3]
([1,2,3],[])
splitAt n f1 f2 = Fold.splitWith (,) (Fold.take n f1) f2

Internal

Parallel Distribution

tee :: Monad m => Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

Distribute one copy of the stream to each fold and zip the results.

                |-------Fold m a b--------|
---stream m a---|                         |---m (b,c)
                |-------Fold m a c--------|

Definition:

>>> tee = Fold.teeWith (,)

Example:

>>> t = Fold.tee Fold.sum Fold.length
>>> Stream.fold t (Stream.enumerateFromTo 1.0 100.0)
(5050.0,100)

distribute :: Monad m => [Fold m a b] -> Fold m a [b] Source #

Distribute one copy of the stream to each fold and collect the results in a container.

                |-------Fold m a b--------|
---stream m a---|                         |---m [b]
                |-------Fold m a b--------|
                |                         |
                           ...
>>> Stream.fold (Fold.distribute [Fold.sum, Fold.length]) (Stream.enumerateFromTo 1 5)
[15,5]
>>> distribute = Prelude.foldr (Fold.teeWith (:)) (Fold.fromPure [])

This is the consumer side dual of the producer side sequence operation.

Stops when all the folds stop.

Unzipping

unzip :: Monad m => Fold m a x -> Fold m b y -> Fold m (a, b) (x, y) Source #

Send the elements of tuples in a stream of tuples through two different folds.

                          |-------Fold m a x--------|
---------stream of (a,b)--|                         |----m (x,y)
                          |-------Fold m b y--------|

Definition:

>>> unzip = Fold.unzipWith id

This is the consumer side dual of the producer side zip operation.

unzipWith :: Monad m => (a -> (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Split elements in the input stream into two parts using a pure splitter function, direct each part to a different fold and zip the results.

Definitions:

>>> unzipWith f = Fold.unzipWithM (return . f)
>>> unzipWith f fld1 fld2 = Fold.lmap f (Fold.unzip fld1 fld2)

This fold terminates when both the input folds terminate.

Pre-release

unzipWithM :: Monad m => (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Like unzipWith but with a monadic splitter function.

Definition:

>>> unzipWithM k f1 f2 = Fold.lmapM k (Fold.unzip f1 f2)

Pre-release

unzipWithFstM :: Monad m => (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Similar to unzipWithM but terminates when the first fold terminates.

unzipWithMinM :: Monad m => (a -> m (b, c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Similar to unzipWithM but terminates when any fold terminates.

Partitioning

partitionByM :: Monad m => (a -> m (Either b c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Partition the input over two folds using an Either partitioning predicate.

                                    |-------Fold b x--------|
-----stream m a --> (Either b c)----|                       |----(x,y)
                                    |-------Fold c y--------|

Example, send input to either fold randomly:

>>> :set -package random
>>> import System.Random (randomIO)
>>> randomly a = randomIO >>= \x -> return $ if x then Left a else Right a
>>> f = Fold.partitionByM randomly Fold.length Fold.length
>>> Stream.fold f (Stream.enumerateFromTo 1 100)
...

Example, send input to the two folds in a proportion of 2:1:

>>> :{
proportionately m n = do
 ref <- newIORef $ cycle $ concat [replicate m Left, replicate n Right]
 return $ \a -> do
     r <- readIORef ref
     writeIORef ref $ tail r
     return $ Prelude.head r a
:}
>>> :{
main = do
 g <- proportionately 2 1
 let f = Fold.partitionByM g Fold.length Fold.length
 r <- Stream.fold f (Stream.enumerateFromTo (1 :: Int) 100)
 print r
:}
>>> main
(67,33)

This is the consumer side dual of the producer side mergeBy operation.

When one fold is done, any input meant for it is ignored until the other fold is also done.

Stops when both the folds stop.

See also: partitionByFstM and partitionByMinM.

Pre-release

partitionByFstM :: Monad m => (a -> m (Either b c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Similar to partitionByM but terminates when the first fold terminates.

partitionByMinM :: Monad m => (a -> m (Either b c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Similar to partitionByM but terminates when any fold terminates.

partitionBy :: Monad m => (a -> Either b c) -> Fold m b x -> Fold m c y -> Fold m a (x, y) Source #

Same as partitionByM but with a pure partition function.

Example, count even and odd numbers in a stream:

>>> :{
 let f = Fold.partitionBy (\n -> if even n then Left n else Right n)
                     (fmap (("Even " ++) . show) Fold.length)
                     (fmap (("Odd "  ++) . show) Fold.length)
  in Stream.fold f (Stream.enumerateFromTo 1 100)
:}
("Even 50","Odd 50")

Pre-release

partition :: Monad m => Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y) Source #

Compose two folds such that the combined fold accepts a stream of Either and routes the Left values to the first fold and Right values to the second fold.

Definition:

>>> partition = Fold.partitionBy id

Splitting

chunksBetween :: Int -> Int -> Fold m a b -> Fold m b c -> Fold m a c Source #

Group the input stream into groups of elements between low and high. Collection starts in chunks of low and then keeps doubling until we reach high. Each chunk is folded using the provided fold function.

This could be useful, for example, when we are folding a stream of unknown size to a stream of arrays and we want to minimize the number of allocations.

NOTE: this would be an application of "many" using a terminating fold.

Unimplemented

intersperseWithQuotes :: (Monad m, Eq a) => a -> a -> a -> Fold m a b -> Fold m b c -> Fold m a c Source #

Nesting

unfoldMany :: Monad m => Unfold m a b -> Fold m b c -> Fold m a c Source #

Unfold and flatten the input stream of a fold.

Stream.fold (unfoldMany u f) = Stream.fold f . Stream.unfoldMany u

Pre-release

concatSequence :: Fold m b c -> t (Fold m a b) -> Fold m a c Source #

concatSequence f t applies folds from stream t sequentially and collects the results using the fold f.

Unimplemented

Deprecated

drainBy :: Monad m => (a -> m b) -> Fold m a () Source #

Deprecated: Please use drainMapM instead.

last :: Monad m => Fold m a (Maybe a) Source #

Deprecated: Please use latest instead.

head :: Monad m => Fold m a (Maybe a) Source #

Deprecated: Please use "one" instead

Extract the first element of the stream, if any.

>>> head = Fold.one

sequence :: Monad m => Fold m a (m b) -> Fold m a b Source #

Deprecated: Use "rmapM id" instead

Flatten the monadic output of a fold to pure output.

mapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c Source #

Deprecated: Use rmapM instead

Map a monadic function on the output of a fold.

variance :: (Monad m, Fractional a) => Fold m a a Source #

Deprecated: Use the streamly-statistics package instead

Compute a numerically stable (population) variance over all elements in the input stream.

stdDev :: (Monad m, Floating a) => Fold m a a Source #

Deprecated: Use the streamly-statistics package instead

Compute a numerically stable (population) standard deviation over all elements in the input stream.

Set operations

toSet :: (Monad m, Ord a) => Fold m a (Set a) Source #

Fold the input to a set.

Definition:

>>> toSet = Fold.foldl' (flip Set.insert) Set.empty

toIntSet :: Monad m => Fold m Int IntSet Source #

Fold the input to an int set. For integer inputs this performs better than toSet.

Definition:

>>> toIntSet = Fold.foldl' (flip IntSet.insert) IntSet.empty

countDistinct :: (Monad m, Ord a) => Fold m a Int Source #

Count non-duplicate elements in the stream.

Definition:

>>> countDistinct = fmap Set.size Fold.toSet
>>> countDistinct = Fold.postscan Fold.nub $ Fold.catMaybes $ Fold.length

The memory used is proportional to the number of distinct elements in the stream, to guard against using too much memory use it as a scan and terminate if the count reaches more than a threshold.

Space: \(\mathcal{O}(n)\)

Pre-release

countDistinctInt :: Monad m => Fold m Int Int Source #

Like countDistinct but specialized to a stream of Int, for better performance.

Definition:

>>> countDistinctInt = fmap IntSet.size Fold.toIntSet
>>> countDistinctInt = Fold.postscan Fold.nubInt $ Fold.catMaybes $ Fold.length

Pre-release

nub :: (Monad m, Ord a) => Fold m a (Maybe a) Source #

Used as a scan. Returns Just for the first occurrence of an element, returns Nothing for any other occurrences.

Example:

>>> stream = Stream.fromList [1::Int,1,2,3,4,4,5,1,5,7]
>>> Stream.fold Fold.toList $ Stream.scanMaybe Fold.nub stream
[1,2,3,4,5,7]

Pre-release

nubInt :: Monad m => Fold m Int (Maybe Int) Source #

Like nub but specialized to a stream of Int, for better performance.

Pre-release

Map operations

frequency :: (Monad m, Ord a) => Fold m a (Map a Int) Source #

Determine the frequency of each element in the stream.

You can just collect the keys of the resulting map to get the unique elements in the stream.

Definition:

>>> frequency = Fold.toMap id Fold.length

Demultiplexing

Direct values in the input stream to different folds using an n-ary fold selector. demux is a generalization of classify (and partition) where each key of the classifier can use a different fold.

You need to see only demux if you are looking to find the capabilities of these combinators, all others are variants of that.

Output is a container

The fold state snapshot returns the key-value container of in-progress folds.

demuxToContainer :: (Monad m, IsMap f, Traversable f) => (a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b) Source #

demuxToContainerIO :: (MonadIO m, IsMap f, Traversable f) => (a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b) Source #

demuxToMap :: (Monad m, Ord k) => (a -> k) -> (a -> m (Fold m a b)) -> Fold m a (Map k b) Source #

This collects all the results of demux in a Map.

demuxToMapIO :: (MonadIO m, Ord k) => (a -> k) -> (a -> m (Fold m a b)) -> Fold m a (Map k b) Source #

Same as demuxToMap but uses demuxIO for better performance.

Input is explicit key-value tuple

Like above but inputs are in explicit key-value pair form.

demuxKvToContainer :: (Monad m, IsMap f, Traversable f) => (Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b) Source #

demuxKvToMap :: (Monad m, Ord k) => (k -> m (Fold m a b)) -> Fold m (k, a) (Map k b) Source #

Fold a stream of key value pairs using a function that maps keys to folds.

Definition:

>>> demuxKvToMap f = Fold.demuxToContainer fst (Fold.lmap snd . f)

Example:

>>> import Data.Map (Map)
>>> :{
 let f "SUM" = return Fold.sum
     f _ = return Fold.product
     input = Stream.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)]
  in Stream.fold (Fold.demuxKvToMap f) input :: IO (Map String Int)
:}
fromList [("PRODUCT",8),("SUM",4)]

Pre-release

Scan of finished fold results

Like above, but the resulting fold state snapshot contains the key value container as well as the finished key result if a fold in the container finished.

demuxGeneric :: (Monad m, IsMap f, Traversable f) => (a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b)) Source #

This is the most general of all demux, classify operations.

See demux for documentation.

demux :: (Monad m, Ord k) => (a -> k) -> (a -> m (Fold m a b)) -> Fold m a (m (Map k b), Maybe (k, b)) Source #

demux getKey getFold: In a key value stream, fold values corresponding to each key using a key specific fold. getFold is invoked to generate a key specific fold when a key is encountered for the first time in the stream.

The first component of the output tuple is a key-value Map of in-progress folds. The fold returns the fold result as the second component of the output tuple whenever a fold terminates.

If a fold terminates, another instance of the fold is started upon receiving an input with that key, getFold is invoked again whenever the key is encountered again.

This can be used to scan a stream and collect the results from the scan output.

Since the fold generator function is monadic we can add folds dynamically. For example, we can maintain a Map of keys to folds in an IORef and lookup the fold from that corresponding to a key. This Map can be changed dynamically, folds for new keys can be added or folds for old keys can be deleted or modified.

Compare with classify, the fold in classify is a static fold.

Pre-release

demuxGenericIO :: (MonadIO m, IsMap f, Traversable f) => (a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b)) Source #

This is specialized version of demuxGeneric that uses mutable IO cells as fold accumulators for better performance.

demuxIO :: (MonadIO m, Ord k) => (a -> k) -> (a -> m (Fold m a b)) -> Fold m a (m (Map k b), Maybe (k, b)) Source #

This is specialized version of demux that uses mutable IO cells as fold accumulators for better performance.

Keep in mind that the values in the returned Map may be changed by the ongoing fold if you are using those concurrently in another thread.

Classifying

In an input stream of key value pairs fold values for different keys in individual output buckets using the given fold. classify is a special case of demux where all the branches of the demultiplexer use the same fold.

Different types of maps can be used with these combinators via the IsMap type class. Hashmap performs better when there are more collisions, trie Map performs better otherwise. Trie has an advantage of sorting the keys at the same time. For example if we want to store a dictionary of words and their meanings then trie Map would be better if we also want to display them in sorted order.

kvToMap :: (Monad m, Ord k) => Fold m a b -> Fold m (k, a) (Map k b) Source #

Given an input stream of key value pairs and a fold for values, fold all the values belonging to each key. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.

Definition:

>>> kvToMap = Fold.toMap fst . Fold.lmap snd

Example:

>>> :{
 let input = Stream.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)]
  in Stream.fold (Fold.kvToMap Fold.toList) input
:}
fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]

Pre-release

toContainer :: (Monad m, IsMap f, Traversable f, Ord (Key f)) => (a -> Key f) -> Fold m a b -> Fold m a (f b) Source #

toContainerIO :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) => (a -> Key f) -> Fold m a b -> Fold m a (f b) Source #

toMap :: (Monad m, Ord k) => (a -> k) -> Fold m a b -> Fold m a (Map k b) Source #

Split the input stream based on a key field and fold each split using the given fold. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.

Example:

>>> import Data.Map.Strict (Map)
>>> :{
 let input = Stream.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)]
     classify = Fold.toMap fst (Fold.lmap snd Fold.toList)
  in Stream.fold classify input :: IO (Map String [Double])
:}
fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]

Once the classifier fold terminates for a particular key any further inputs in that bucket are ignored.

Space used is proportional to the number of keys seen till now and monotonically increases because it stores whether a key has been seen or not.

See demuxToMap for a more powerful version where you can use a different fold for each key. A simpler version of toMap retaining only the last value for a key can be written as:

>>> toMap = Fold.foldl' (\kv (k, v) -> Map.insert k v kv) Map.empty

Stops: never

Pre-release

toMapIO :: (MonadIO m, Ord k) => (a -> k) -> Fold m a b -> Fold m a (Map k b) Source #

Same as toMap but maybe faster because it uses mutable cells as fold accumulators in the Map.

classifyGeneric :: (Monad m, IsMap f, Traversable f, Ord (Key f)) => (a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b)) Source #

classify :: (Monad m, Ord k) => (a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b)) Source #

Folds the values for each key using the supplied fold. When scanning, as soon as the fold is complete, its result is available in the second component of the tuple. The first component of the tuple is a snapshot of the in-progress folds.

Once the fold for a key is done, any future values of the key are ignored.

Definition:

>>> classify f fld = Fold.demux f (const fld)

classifyGenericIO :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) => (a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b)) Source #

classifyIO :: (MonadIO m, Ord k) => (a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b)) Source #

Same as classify except that it uses mutable IORef cells in the Map providing better performance. Be aware that if this is used as a scan, the values in the intermediate Maps would be mutable.

Definitions:

>>> classifyIO f fld = Fold.demuxIO f (const fld)

Incremental Folds

Folds of type Fold m (a, Maybe a) b are incremental sliding window folds. An input of type (a, Nothing) indicates that the input element a is being inserted in the window without ejecting an old value increasing the window size by 1. An input of type (a, Just a) indicates that the first element is being inserted in the window and the second element is being removed from the window, the window size remains the same. The window size can only increase and never decrease.

You can compute the statistics over the entire stream using sliding window folds by keeping the second element of the input tuple as Nothing.

windowLmap :: (c -> a) -> Fold m (a, Maybe a) b -> Fold m (c, Maybe c) b Source #

Map a function on the incoming as well as outgoing element of a rolling window fold.

>>> lmap f = Fold.lmap (bimap f (f <$>))

cumulative :: Fold m (a, Maybe a) b -> Fold m a b Source #

Convert an incremental fold to a cumulative fold using the entire input stream as a single window.

>>> cumulative f = Fold.lmap (\x -> (x, Nothing)) f

windowRollingMap :: Monad m => (Maybe a -> a -> Maybe b) -> Fold m (a, Maybe a) (Maybe b) Source #

Apply a pure function on the latest and the oldest element of the window.

>>> windowRollingMap f = Fold.windowRollingMapM (\x y -> return $ f x y)

windowRollingMapM :: Monad m => (Maybe a -> a -> m (Maybe b)) -> Fold m (a, Maybe a) (Maybe b) Source #

Apply an effectful function on the latest and the oldest element of the window.

Sums

windowLength :: (Monad m, Num b) => Fold m (a, Maybe a) b Source #

The number of elements in the rolling window.

This is the \(0\)th power sum.

>>> length = powerSum 0

windowSum :: forall m a. (Monad m, Num a) => Fold m (a, Maybe a) a Source #

Sum of all the elements in a rolling window:

\(S = \sum_{i=1}^n x_{i}\)

This is the first power sum.

>>> sum = powerSum 1

Uses Kahan-Babuska-Neumaier style summation for numerical stability of floating precision arithmetic.

Space: \(\mathcal{O}(1)\)

Time: \(\mathcal{O}(n)\)

windowSumInt :: forall m a. (Monad m, Integral a) => Fold m (a, Maybe a) a Source #

The sum of all the elements in a rolling window. The input elements are required to be intergal numbers.

This was written in the hope that it would be a tiny bit faster than sum for Integral values. But turns out that sum is 2% faster than this even for intergal values!

Internal

windowPowerSum :: (Monad m, Num a) => Int -> Fold m (a, Maybe a) a Source #

Sum of the \(k\)th power of all the elements in a rolling window:

\(S_k = \sum_{i=1}^n x_{i}^k\)

>>> powerSum k = lmap (^ k) sum

Space: \(\mathcal{O}(1)\)

Time: \(\mathcal{O}(n)\)

windowPowerSumFrac :: (Monad m, Floating a) => a -> Fold m (a, Maybe a) a Source #

Like powerSum but powers can be negative or fractional. This is slower than powerSum for positive intergal powers.

>>> powerSumFrac p = lmap (** p) sum

Location

windowMinimum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a) Source #

Find the minimum element in a rolling window.

This implementation traverses the entire window buffer to compute the minimum whenever we demand it. It performs better than the dequeue based implementation in streamly-statistics package when the window size is small (< 30).

If you want to compute the minimum of the entire stream minimum is much faster.

Time: \(\mathcal{O}(n*w)\) where \(w\) is the window size.

windowMaximum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a) Source #

The maximum element in a rolling window.

See the performance related comments in minimum.

If you want to compute the maximum of the entire stream maximum would be much faster.

Time: \(\mathcal{O}(n*w)\) where \(w\) is the window size.

windowRange :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe (a, a)) Source #

Determine the maximum and minimum in a rolling window.

If you want to compute the range of the entire stream Fold.teeWith (,) Fold.maximum Fold.minimum would be much faster.

Space: \(\mathcal{O}(n)\) where n is the window size.

Time: \(\mathcal{O}(n*w)\) where \(w\) is the window size.

windowMean :: forall m a. (Monad m, Fractional a) => Fold m (a, Maybe a) a Source #

Arithmetic mean of elements in a sliding window:

\(\mu = \frac{\sum_{i=1}^n x_{i}}{n}\)

This is also known as the Simple Moving Average (SMA) when used in the sliding window and Cumulative Moving Avergae (CMA) when used on the entire stream.

>>> mean = Fold.teeWith (/) sum length

Space: \(\mathcal{O}(1)\)

Time: \(\mathcal{O}(n)\)