-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Beautiful Streaming, Concurrent and Reactive Composition -- -- Streamly is a framework for writing programs in a high level, -- declarative data flow programming paradigm. It provides a simple API, -- very close to standard Haskell lists. A program is expressed as a -- composition of data processing pipes, generally known as streams. -- Streams can be generated, merged, chained, mapped, zipped, and -- consumed concurrently – enabling a high level, declarative yet -- concurrent composition of programs. Programs can be concurrent or -- non-concurrent without any significant change. Concurrency is auto -- scaled based on consumption rate. Programmers do not have to be aware -- of threads, locking or synchronization to write scalable concurrent -- programs. Streamly provides C like performance, orders of magnitude -- better compared to existing streaming libraries. -- -- Streamly is designed to express the full spectrum of programs with -- highest performance. Do not think that if you are writing a small and -- simple program it may not be for you. It expresses a small "hello -- world" program with the same efficiency, simplicity and elegance as a -- large scale concurrent application. It unifies many different aspects -- of special purpose libraries into a single yet simple framework. -- -- Streamly covers the functionality provided by Haskell lists as well as -- the functionality provided by streaming libraries like -- streaming, pipes, and conduit with a simpler API -- and better performance. Streamly provides advanced stream composition -- including various ways of appending, merging, zipping, splitting, -- grouping, distributing, partitioning and unzipping of streams with -- true streaming and with concurrency. Streamly subsumes the -- functionality of list transformer libraries like pipes or -- list-t and also the logic programming library logict. -- The grouping, splitting and windowing combinators in streamly can be -- compared to the window operators in Apache Flink. However, -- compared to Flink streamly has a pure functional, succinct and -- expressive API. -- -- The concurrency capabilities of streamly are much more advanced and -- powerful compared to the basic concurrency functionality provided by -- the async package. Streamly is a first class reactive -- programming library. If you are familiar with Reactive -- Extensions you will find that it is very similar. For most RxJs -- combinators you can find or write corresponding ones in streamly. -- Streamly can be used as an alternative to Yampa or -- reflex as well. -- -- Streamly focuses on practical engineering with high performance. From -- well written streamly programs one can expect performance competitive -- to C. High performance streaming eliminates the need for string and -- text libraries like bytestring, text and their lazy and -- strict flavors. The confusion and cognitive overhead arising from -- different string types is eliminated. The two fundamental types in -- streamly are arrays for storage and streams for processing. Strings -- and text are simply streams or arrays of Char as they should -- be. Arrays in streamly have performance at par with the vector -- library. -- -- Where to find more information: -- --
-- $ cabal haddock --haddock-option="--show-all" -- $ stack haddock --haddock-arguments "--show-all" --no-haddock-deps --@package streamly @version 0.7.0 -- | Fold type represents an effectful action that consumes a value -- from an input stream and combines it with a single final value often -- called an accumulator, returning the resulting output accumulator. -- Values from a stream can be pushed to the fold and consumed one -- at a time. It can also be called a consumer of stream or a sink. It is -- a data representation of the standard foldl' function. A -- Fold can be turned into an effect (m b) using -- fold by supplying it the input stream. -- -- Using this representation multiple folds can be combined efficiently -- using combinators; a stream can then be supplied to the combined fold -- and it would distribute the input to constituent folds according to -- the composition. For example, an applicative composition distributes -- the same input to the constituent folds and then combines the -- resulting fold outputs. Similarly, a partitioning combinator divides -- the input among constituent folds. -- --
-- import qualified Streamly.Data.Fold as FL ---- -- More, not yet exposed, fold combinators can be found in -- Streamly.Internal.Data.Fold. module Streamly.Data.Fold -- | Represents a left fold over an input stream of values of type -- a to a single value of type b in Monad -- m. -- -- The fold uses an intermediate state s as accumulator. The -- step function updates the state and returns the new updated -- state. When the fold is done the final result of the fold is extracted -- from the intermediate state representation using the extract -- function. data Fold m a b -- | A fold that drains all its input, running the effects and discarding -- the results. drain :: Monad m => Fold m a () -- |
-- drainBy f = lmapM f drain ---- -- Drain all input after passing it through a monadic function. This is -- the dual of mapM_ on stream producers. drainBy :: Monad m => (a -> m b) -> Fold m a () -- | Extract the last element of the input stream, if any. last :: Monad m => Fold m a (Maybe a) -- | Determine the length of the input stream. length :: Monad m => Fold m a Int -- | Determine the sum of all elements of a stream of numbers. Returns -- additive identity (0) when the stream is empty. Note that -- this is not numerically stable for floating point numbers. sum :: (Monad m, Num a) => Fold m a a -- | Determine the product of all elements of a stream of numbers. Returns -- multiplicative identity (1) when the stream is empty. product :: (Monad m, Num a) => Fold m a a -- | Determine the maximum element in a stream using the supplied -- comparison function. maximumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) -- |
-- maximum = maximumBy compare ---- -- Determine the maximum element in a stream. maximum :: (Monad m, Ord a) => Fold m a (Maybe a) -- | Computes the minimum element with respect to the given comparison -- function minimumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) -- | Determine the minimum element in a stream using the supplied -- comparison function. minimum :: (Monad m, Ord a) => Fold m a (Maybe a) -- | Compute a numerically stable arithmetic mean of all elements in the -- input stream. mean :: (Monad m, Fractional a) => Fold m a a -- | Compute a numerically stable (population) variance over all elements -- in the input stream. variance :: (Monad m, Fractional a) => Fold m a a -- | Compute a numerically stable (population) standard deviation over all -- elements in the input stream. stdDev :: (Monad m, Floating a) => Fold m a a -- | Fold an input stream consisting of monoidal elements using -- mappend and mempty. -- --
-- S.fold FL.mconcat (S.map Sum $ S.enumerateFromTo 1 10) --mconcat :: (Monad m, Monoid a) => Fold m a a -- |
-- foldMap f = map f mconcat ---- -- Make a fold from a pure function that folds the output of the function -- using mappend and mempty. -- --
-- S.fold (FL.foldMap Sum) $ S.enumerateFromTo 1 10 --foldMap :: (Monad m, Monoid b) => (a -> b) -> Fold m a b -- |
-- foldMapM f = mapM f mconcat ---- -- Make a fold from a monadic function that folds the output of the -- function using mappend and mempty. -- --
-- S.fold (FL.foldMapM (return . Sum)) $ S.enumerateFromTo 1 10 --foldMapM :: (Monad m, Monoid b) => (a -> m b) -> Fold m a b -- | Folds the input stream to a list. -- -- Warning! working on large lists accumulated as buffers in -- memory could be very inefficient, consider using Streamly.Array -- instead. toList :: Monad m => Fold m a [a] -- | Lookup the element at the given index. index :: Monad m => Int -> Fold m a (Maybe a) -- | Extract the first element of the stream, if any. head :: Monad m => Fold m a (Maybe a) -- | Returns the first element that satisfies the given predicate. find :: Monad m => (a -> Bool) -> Fold m a (Maybe a) -- | In a stream of (key-value) pairs (a, b), return the value -- b of the first pair where the key equals the given value -- a. lookup :: (Eq a, Monad m) => a -> Fold m (a, b) (Maybe b) -- | Returns the first index that satisfies the given predicate. findIndex :: Monad m => (a -> Bool) -> Fold m a (Maybe Int) -- | Returns the first index where a given value is found in the stream. elemIndex :: (Eq a, Monad m) => a -> Fold m a (Maybe Int) -- | Return True if the input stream is empty. null :: Monad m => Fold m a Bool -- | Return True if the given element is present in the stream. elem :: (Eq a, Monad m) => a -> Fold m a Bool -- | Returns True if the given element is not present in the stream. notElem :: (Eq a, Monad m) => a -> Fold m a Bool -- |
-- all p = lmap p and ---- -- | Returns True if all elements of a stream satisfy a predicate. all :: Monad m => (a -> Bool) -> Fold m a Bool -- |
-- any p = lmap p or ---- -- | Returns True if any of the elements of a stream satisfies a -- predicate. any :: Monad m => (a -> Bool) -> Fold m a Bool -- | Returns True if all elements are True, False -- otherwise and :: Monad m => Fold m Bool Bool -- | Returns True if any element is True, False -- otherwise or :: Monad m => Fold m Bool Bool -- | Flatten the monadic output of a fold to pure output. sequence :: Monad m => Fold m a (m b) -> Fold m a b -- | Map a monadic function on the output of a fold. mapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c -- | Distribute one copy of the stream to each fold and zip the results. -- --
-- |-------Fold m a b--------| -- ---stream m a---| |---m (b,c) -- |-------Fold m a c--------| ---- --
-- >>> S.fold (FL.tee FL.sum FL.length) (S.enumerateFromTo 1.0 100.0) -- (5050.0,100) --tee :: Monad m => Fold m a b -> Fold m a c -> Fold m a (b, c) -- | Distribute one copy of the stream to each fold and collect the results -- in a container. -- --
-- |-------Fold m a b--------| -- ---stream m a---| |---m [b] -- |-------Fold m a b--------| -- | | -- ... ---- --
-- >>> S.fold (FL.distribute [FL.sum, FL.length]) (S.enumerateFromTo 1 5) -- [15,5] ---- -- This is the consumer side dual of the producer side sequence -- operation. distribute :: Monad m => [Fold m a b] -> Fold m a [b] -- | Compose two folds such that the combined fold accepts a stream of -- Either and routes the Left values to the first fold and -- Right values to the second fold. -- --
-- partition = partitionBy id --partition :: Monad m => Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y) -- | Send the elements of tuples in a stream of tuples through two -- different folds. -- --
-- |-------Fold m a x--------| -- ---------stream of (a,b)--| |----m (x,y) -- |-------Fold m b y--------| ---- -- This is the consumer side dual of the producer side zip -- operation. unzip :: Monad m => Fold m a x -> Fold m b y -> Fold m (a, b) (x, y) -- | Unfold type represents an effectful action that generates a -- stream of values from a single starting value often called a seed -- value. Values can be generated and pulled from the -- Unfold one at a time. It can also be called a producer or a -- source of stream. It is a data representation of the standard -- unfoldr function. An Unfold can be converted into a -- stream type using unfold by supplying the seed. -- --
-- import qualified Streamly.Data.Unfold as UF ---- -- More, not yet exposed, unfold combinators can be found in -- Streamly.Internal.Data.Unfold. module Streamly.Data.Unfold -- | An Unfold m a b is a generator of a stream of values of type -- b from a seed of type a in Monad m. data Unfold m a b -- | This module provides immutable arrays in pinned memory (non GC memory) -- suitable for long lived data storage, random access and for -- interfacing with the operating system. -- -- Arrays in this module are chunks of pinned memory that hold a sequence -- of Storable values of a given type, they cannot store -- non-serializable data like functions. Once created an array cannot be -- modified. Pinned memory allows efficient buffering of long lived data -- without adding any impact to GC. One array is just one pointer visible -- to GC and it does not have to copied across generations. Moreover, -- pinned memory allows communication with foreign consumers and -- producers (e.g. file or network IO) without copying the data. -- --
-- import qualified Streamly.Array as A ---- -- For experimental APIs see Streamly.Internal.Memory.Array. module Streamly.Memory.Array data Array a -- | Create an Array from the first N elements of a list. The array -- is allocated to size N, if the list terminates before N elements then -- the array may hold less than N elements. fromListN :: Storable a => Int -> [a] -> Array a -- | Create an Array from a list. The list must be of finite size. fromList :: Storable a => [a] -> Array a -- | writeN n folds a maximum of n elements from the -- input stream to an Array. writeN :: forall m a. (MonadIO m, Storable a) => Int -> Fold m a (Array a) -- | Fold the whole input to a single array. -- -- Caution! Do not use this on infinite streams. write :: forall m a. (MonadIO m, Storable a) => Fold m a (Array a) -- | Convert an Array into a list. toList :: Storable a => Array a -> [a] -- | Unfold an array into a stream. read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a -- | O(1) Get the length of the array i.e. the number of elements in -- the array. length :: forall a. Storable a => Array a -> Int -- | This module is designed to be imported qualified: -- --
-- import qualified Streamly.Prelude as S ---- -- Functions with the suffix M are general functions that work -- on monadic arguments. The corresponding functions without the suffix -- M work on pure arguments and can in general be derived from -- their monadic versions but are provided for convenience and for -- consistency with other pure APIs in the base package. -- -- In many cases, short definitions of the combinators are provided in -- the documentation for illustration. The actual implementation may -- differ for performance reasons. -- -- Functions having a MonadAsync constraint work concurrently -- when used with appropriate stream type combinator. Please be careful -- to not use parallely with infinite streams. -- -- Deconstruction and folds accept a SerialT type instead of a -- polymorphic type to ensure that streams always have a concrete -- monomorphic type by default, reducing type errors. In case you want to -- use any other type of stream you can use one of the type combinators -- provided in the Streamly module to convert the stream type. module Streamly.Prelude -- | An empty stream. -- --
-- > toList nil -- [] --nil :: IsStream t => t m a -- | Construct a stream by adding a pure value at the head of an existing -- stream. For serial streams this is the same as (return a) `consM` -- r but more efficient. For concurrent streams this is not -- concurrent whereas consM is concurrent. For example: -- --
-- > toList $ 1 `cons` 2 `cons` 3 `cons` nil -- [1,2,3] --cons :: IsStream t => a -> t m a -> t m a infixr 5 `cons` -- | Operator equivalent of cons. -- --
-- > toList $ 1 .: 2 .: 3 .: nil -- [1,2,3] --(.:) :: IsStream t => a -> t m a -> t m a infixr 5 .: -- | Constructs a stream by adding a monadic action at the head of an -- existing stream. For example: -- --
-- > toList $ getLine `consM` getLine `consM` nil -- hello -- world -- ["hello","world"] ---- -- Concurrent (do not use parallely to construct infinite -- streams) consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 `consM` -- | Operator equivalent of consM. We can read it as "parallel -- colon" to remember that | comes before :. -- --
-- > toList $ getLine |: getLine |: nil -- hello -- world -- ["hello","world"] ---- --
-- let delay = threadDelay 1000000 >> print 1 -- drain $ serially $ delay |: delay |: delay |: nil -- drain $ parallely $ delay |: delay |: delay |: nil ---- -- Concurrent (do not use parallely to construct infinite -- streams) (|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 |: -- |
-- yield a = a `cons` nil ---- -- Create a singleton stream from a pure value. -- -- The following holds in monadic streams, but not in Zip streams: -- --
-- yield = pure -- yield = yieldM . pure ---- -- In Zip applicative streams yield is not the same as pure -- because in that case pure is equivalent to repeat -- instead. yield and pure are equally efficient, in other -- cases yield may be slightly more efficient than the other -- equivalent definitions. yield :: IsStream t => a -> t m a -- |
-- yieldM m = m `consM` nil ---- -- Create a singleton stream from a monadic action. -- --
-- > toList $ yieldM getLine -- hello -- ["hello"] --yieldM :: (Monad m, IsStream t) => m a -> t m a -- | Generate an infinite stream by repeating a pure value. repeat :: (IsStream t, Monad m) => a -> t m a -- |
-- repeatM = fix . consM -- repeatM = cycle1 . yieldM ---- -- Generate a stream by repeatedly executing a monadic action forever. -- --
-- drain $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) -- drain $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) ---- -- Concurrent, infinite (do not use with parallely) repeatM :: (IsStream t, MonadAsync m) => m a -> t m a -- |
-- replicate = take n . repeat ---- -- Generate a stream of length n by repeating a value n -- times. replicate :: (IsStream t, Monad m) => Int -> a -> t m a -- |
-- replicateM = take n . repeatM ---- -- Generate a stream by performing a monadic action n times. -- Same as: -- --
-- drain $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) -- drain $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) ---- -- Concurrent replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -- | Types that can be enumerated as a stream. The operations in this type -- class are equivalent to those in the Enum type class, except -- that these generate a stream instead of a list. Use the functions in -- Streamly.Streams.Enumeration module to define new instances. class Enum a => Enumerable a -- | enumerateFrom from generates a stream starting with the -- element from, enumerating up to maxBound when the type -- is Bounded or generating an infinite stream when the type is -- not Bounded. -- --
-- > S.toList $ S.take 4 $ S.enumerateFrom (0 :: Int) -- [0,1,2,3] ---- -- For Fractional types, enumeration is numerically stable. -- However, no overflow or underflow checks are performed. -- --
-- > S.toList $ S.take 4 $ S.enumerateFrom 1.1 -- [1.1,2.1,3.1,4.1] --enumerateFrom :: (Enumerable a, IsStream t, Monad m) => a -> t m a -- | Generate a finite stream starting with the element from, -- enumerating the type up to the value to. If to is -- smaller than from then an empty stream is returned. -- --
-- > S.toList $ S.enumerateFromTo 0 4 -- [0,1,2,3,4] ---- -- For Fractional types, the last element is equal to the -- specified to value after rounding to the nearest integral -- value. -- --
-- > S.toList $ S.enumerateFromTo 1.1 4 -- [1.1,2.1,3.1,4.1] -- > S.toList $ S.enumerateFromTo 1.1 4.6 -- [1.1,2.1,3.1,4.1,5.1] --enumerateFromTo :: (Enumerable a, IsStream t, Monad m) => a -> a -> t m a -- | enumerateFromThen from then generates a stream whose first -- element is from, the second element is then and the -- successive elements are in increments of then - from. -- Enumeration can occur downwards or upwards depending on whether -- then comes before or after from. For Bounded -- types the stream ends when maxBound is reached, for unbounded -- types it keeps enumerating infinitely. -- --
-- > S.toList $ S.take 4 $ S.enumerateFromThen 0 2 -- [0,2,4,6] -- > S.toList $ S.take 4 $ S.enumerateFromThen 0 (-2) -- [0,-2,-4,-6] --enumerateFromThen :: (Enumerable a, IsStream t, Monad m) => a -> a -> t m a -- | enumerateFromThenTo from then to generates a finite stream -- whose first element is from, the second element is -- then and the successive elements are in increments of -- then - from up to to. Enumeration can occur -- downwards or upwards depending on whether then comes before -- or after from. -- --
-- > S.toList $ S.enumerateFromThenTo 0 2 6 -- [0,2,4,6] -- > S.toList $ S.enumerateFromThenTo 0 (-2) (-6) -- [0,-2,-4,-6] --enumerateFromThenTo :: (Enumerable a, IsStream t, Monad m) => a -> a -> a -> t m a -- |
-- enumerate = enumerateFrom minBound ---- -- Enumerate a Bounded type from its minBound to -- maxBound enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a -- |
-- enumerateTo = enumerateFromTo minBound ---- -- Enumerate a Bounded type from its minBound to specified -- value. enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a -- |
-- unfoldr step s = -- case step s of -- Nothing -> nil -- Just (a, b) -> a `cons` unfoldr step b ---- -- Build a stream by unfolding a pure step function step -- starting from a seed s. The step function returns the next -- element in the stream and the next seed value. When it is done it -- returns Nothing and the stream ends. For example, -- --
-- let f b = -- if b > 3 -- then Nothing -- else Just (b, b + 1) -- in toList $ unfoldr f 0 ---- --
-- [0,1,2,3] --unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a -- | Build a stream by unfolding a monadic step function starting -- from a seed. The step function returns the next element in the stream -- and the next seed value. When it is done it returns Nothing and -- the stream ends. For example, -- --
-- let f b = -- if b > 3 -- then return Nothing -- else print b >> return (Just (b, b + 1)) -- in drain $ unfoldrM f 0 ---- --
-- 0 -- 1 -- 2 -- 3 ---- -- When run concurrently, the next unfold step can run concurrently with -- the processing of the output of the previous step. Note that more than -- one step cannot run concurrently as the next step depends on the -- output of the previous step. -- --
-- (asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0) -- & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () ---- -- Concurrent -- -- Since: 0.1.0 unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a -- | Convert an Unfold into a stream by supplying it an input seed. -- --
-- >>> unfold UF.replicateM 10 (putStrLn "hello") ---- -- Since: 0.7.0 unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b -- |
-- iterate f x = x `cons` iterate f x ---- -- Generate an infinite stream with x as the first element and -- each successive element derived by applying the function f on -- the previous element. -- --
-- > S.toList $ S.take 5 $ S.iterate (+1) 1 -- [1,2,3,4,5] --iterate :: IsStream t => (a -> a) -> a -> t m a -- |
-- iterateM f m = m >>= a -> return a `consM` iterateM f (f a) ---- -- Generate an infinite stream with the first element generated by the -- action m and each successive element derived by applying the -- monadic function f on the previous element. -- -- When run concurrently, the next iteration can run concurrently with -- the processing of the previous iteration. Note that more than one -- iteration cannot run concurrently as the next iteration depends on the -- output of the previous iteration. -- --
-- drain $ serially $ S.take 10 $ S.iterateM -- (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0) -- -- drain $ asyncly $ S.take 10 $ S.iterateM -- (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0) ---- -- Concurrent -- -- Since: 0.7.0 (signature change) -- -- Since: 0.1.2 iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a -- |
-- fromIndices f = let g i = f i `cons` g (i + 1) in g 0 ---- -- Generate an infinite stream, whose values are the output of a function -- f applied on the corresponding index. Index starts at 0. -- --
-- > S.toList $ S.take 5 $ S.fromIndices id -- [0,1,2,3,4] --fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a -- |
-- fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0 ---- -- Generate an infinite stream, whose values are the output of a monadic -- function f applied on the corresponding index. Index starts -- at 0. -- -- Concurrent fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a -- |
-- fromList = foldr cons nil ---- -- Construct a stream from a list of pure values. This is more efficient -- than fromFoldable for serial streams. fromList :: (Monad m, IsStream t) => [a] -> t m a -- |
-- fromListM = foldr consM nil ---- -- Construct a stream from a list of monadic actions. This is more -- efficient than fromFoldableM for serial streams. fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a -- |
-- fromFoldable = foldr cons nil ---- -- Construct a stream from a Foldable containing pure values: fromFoldable :: (IsStream t, Foldable f) => f a -> t m a -- |
-- fromFoldableM = foldr consM nil ---- -- Construct a stream from a Foldable containing monadic actions. -- --
-- drain $ serially $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) -- drain $ asyncly $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) ---- -- Concurrent (do not use with parallely on infinite -- containers) fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a -- | Decompose a stream into its head and tail. If the stream is empty, -- returns Nothing. If the stream is non-empty, returns Just -- (a, ma), where a is the head of the stream and -- ma its tail. -- -- This is a brute force primitive. Avoid using it as long as possible, -- use it when no other combinator can do the job. This can be used to do -- pretty much anything in an imperative manner, as it just breaks down -- the stream into individual elements and we can loop over them as we -- deem fit. For example, this can be used to convert a streamly stream -- into other stream types. uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) -- |
-- tail = fmap (fmap snd) . uncons ---- -- Extract all but the first element of the stream, if any. tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) -- | Extract all but the last element of the stream, if any. init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) -- | Right associative/lazy pull fold. foldrM build final stream -- constructs an output structure using the step function build. -- build is invoked with the next input element and the -- remaining (lazy) tail of the output structure. It builds a lazy output -- expression using the two. When the "tail structure" in the output -- expression is evaluated it calls build again thus lazily -- consuming the input stream until either the output expression -- built by build is free of the "tail" or the input is -- exhausted in which case final is used as the terminating case -- for the output structure. For more details see the description in the -- previous section. -- -- Example, determine if any element is odd in a stream: -- --
-- >>> S.foldrM (\x xs -> if odd x then return True else xs) (return False) $ S.fromList (2:4:5:undefined) -- > True ---- -- Since: 0.7.0 (signature changed) -- -- Since: 0.2.0 (signature changed) -- -- Since: 0.1.0 foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b -- | Right fold, lazy for lazy monads and pure streams, and strict for -- strict monads. -- -- Please avoid using this routine in strict monads like IO unless you -- need a strict right fold. This is provided only for use in lazy monads -- (e.g. Identity) or pure streams. Note that with this signature it is -- not possible to implement a lazy foldr when the monad m is -- strict. In that case it would be strict in its accumulator and -- therefore would necessarily consume all its input. foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b -- | Left associative/strict push fold. foldl' reduce initial -- stream invokes reduce with the accumulator and the next -- input in the input stream, using initial as the initial value -- of the current value of the accumulator. When the input is exhausted -- the current value of the accumulator is returned. Make sure to use a -- strict data structure for accumulator to not build unnecessary lazy -- expressions unless that's what you want. See the previous section for -- more details. foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b -- | Strict left fold, for non-empty streams, using first element as the -- starting value. Returns Nothing if the stream is empty. foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) -- | Like foldl' but with a monadic step function. foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b -- | Fold a stream using the supplied left fold. -- --
-- >>> S.fold FL.sum (S.enumerateFromTo 1 100) -- 5050 --fold :: Monad m => Fold m a b -> SerialT m a -> m b -- |
-- drain = mapM_ (\_ -> return ()) ---- -- Run a stream, discarding the results. By default it interprets the -- stream as SerialT, to run other types of streams use the type -- adapting combinators for example drain . asyncly. drain :: Monad m => SerialT m a -> m () -- | Extract the last element of the stream, if any. -- --
-- last xs = xs !! (length xs - 1) --last :: Monad m => SerialT m a -> m (Maybe a) -- | Determine the length of the stream. length :: Monad m => SerialT m a -> m Int -- | Determine the sum of all elements of a stream of numbers. Returns -- 0 when the stream is empty. Note that this is not numerically -- stable for floating point numbers. sum :: (Monad m, Num a) => SerialT m a -> m a -- | Determine the product of all elements of a stream of numbers. Returns -- 1 when the stream is empty. product :: (Monad m, Num a) => SerialT m a -> m a -- | Determine the maximum element in a stream using the supplied -- comparison function. maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) -- |
-- maximum = maximumBy compare ---- -- Determine the maximum element in a stream. maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) -- | Determine the minimum element in a stream using the supplied -- comparison function. minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) -- |
-- minimum = minimumBy compare ---- -- Determine the minimum element in a stream. minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) -- | Ensures that all the elements of the stream are identical and then -- returns that unique element. the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) -- |
-- toList = S.foldr (:) [] ---- -- Convert a stream into a list in the underlying monad. The list can be -- consumed lazily in a lazy monad (e.g. Identity). In a strict -- monad (e.g. IO) the whole list is generated and buffered before it can -- be consumed. -- -- Warning! working on large lists accumulated as buffers in -- memory could be very inefficient, consider using Streamly.Array -- instead. toList :: Monad m => SerialT m a -> m [a] -- |
-- drainN n = drain . take n ---- -- Run maximum up to n iterations of a stream. drainN :: Monad m => Int -> SerialT m a -> m () -- |
-- drainWhile p = drain . takeWhile p ---- -- Run a stream as long as the predicate holds true. drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () -- | Lookup the element at the given index. (!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) -- | Extract the first element of the stream, if any. -- --
-- head = (!! 0) --head :: Monad m => SerialT m a -> m (Maybe a) -- | Returns the first element that satisfies the given predicate. findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) -- | Like findM but with a non-monadic predicate. -- --
-- find p = findM (return . p) --find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) -- | In a stream of (key-value) pairs (a, b), return the value -- b of the first pair where the key equals the given value -- a. -- --
-- lookup = snd <$> find ((==) . fst) --lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) -- | Returns the first index that satisfies the given predicate. findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) -- | Returns the first index where a given value is found in the stream. -- --
-- elemIndex a = findIndex (== a) --elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) -- | Determine whether the stream is empty. null :: Monad m => SerialT m a -> m Bool -- | Determine whether an element is present in the stream. elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool -- | Determine whether an element is not present in the stream. notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool -- | Determine whether all elements of a stream satisfy a predicate. all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool -- | Determine whether any of the elements of a stream satisfy a predicate. any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool -- | Determines if all elements of a boolean stream are True. and :: Monad m => SerialT m Bool -> m Bool -- | Determines whether at least one element of a boolean stream is True. or :: Monad m => SerialT m Bool -> m Bool -- | Compare two streams for equality using an equality function. eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool -- | Compare two streams lexicographically using a comparison function. cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering -- | Returns True if the first stream is the same as or a prefix of -- the second. A stream is a prefix of itself. -- --
-- > S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char) -- True --isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool -- | Returns True if all the elements of the first stream occur, in -- order, in the second stream. The elements do not have to occur -- consecutively. A stream is a subsequence of itself. -- --
-- > S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char) -- True --isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool -- | Drops the given prefix from a stream. Returns Nothing if the -- stream does not start with the given prefix. Returns Just nil -- when the prefix is the same as the stream. stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) -- |
-- map = fmap ---- -- Same as fmap. -- --
-- > S.toList $ S.map (+1) $ S.fromList [1,2,3] -- [2,3,4] --map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b -- |
-- sequence = mapM id ---- -- Replace the elements of a stream of monadic actions with the outputs -- of those actions. -- --
-- > drain $ S.sequence $ S.fromList [putStr "a", putStr "b", putStrLn "c"] -- abc -- -- drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) -- & (serially . S.sequence) -- -- drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) -- & (asyncly . S.sequence) ---- -- Concurrent (do not use with parallely on infinite -- streams) sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a -- |
-- mapM f = sequence . map f ---- -- Apply a monadic function to each element of the stream and replace it -- with the output of the resulting action. -- --
-- > drain $ S.mapM putStr $ S.fromList ["a", "b", "c"] -- abc -- -- drain $ S.replicateM 10 (return 1) -- & (serially . S.mapM (\x -> threadDelay 1000000 >> print x)) -- -- drain $ S.replicateM 10 (return 1) -- & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x)) ---- -- Concurrent (do not use with parallely on infinite -- streams) mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b -- |
-- mapM_ = drain . mapM ---- -- Apply a monadic action to each element of the stream and discard the -- output of the action. This is not really a pure transformation -- operation but a transformation followed by fold. mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () -- | Apply a monadic function to each element flowing through the stream -- and discard the results. -- --
-- > S.drain $ S.trace print (S.enumerateFromTo 1 2) -- 1 -- 2 ---- -- Compare with tap. trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a -- | Tap the data flowing through a stream into a Fold. For example, -- you may add a tap to log the contents flowing through the stream. The -- fold is used only for effects, its result is discarded. -- --
-- Fold m a b -- | -- -----stream m a ---------------stream m a----- ---- --
-- > S.drain $ S.tap (FL.drainBy print) (S.enumerateFromTo 1 2) -- 1 -- 2 ---- -- Compare with trace. tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a -- | Strict left scan. Like map, scanl' too is a one to one -- transformation, however it adds an extra element. -- --
-- > S.toList $ S.scanl' (+) 0 $ fromList [1,2,3,4] -- [0,1,3,6,10] ---- --
-- > S.toList $ S.scanl' (flip (:)) [] $ S.fromList [1,2,3,4] -- [[],[1],[2,1],[3,2,1],[4,3,2,1]] ---- -- The output of scanl' is the initial value of the accumulator -- followed by all the intermediate steps and the final result of -- foldl'. -- -- By streaming the accumulated state after each fold step, we can share -- the state across multiple stages of stream composition. Each stage can -- modify or extend the state, do some processing with it and emit it for -- the next stage, thus modularizing the stream processing. This can be -- useful in stateful or event-driven programming. -- -- Consider the following monolithic example, computing the sum and the -- product of the elements in a stream in one go using a foldl': -- --
-- > S.foldl' (\(s, p) x -> (s + x, p * x)) (0,1) $ S.fromList [1,2,3,4] -- (10,24) ---- -- Using scanl' we can make it modular by computing the sum in -- the first stage and passing it down to the next stage for computing -- the product: -- --
-- > S.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1) -- $ S.scanl' (\(s, _) x -> (s + x, x)) (0,1) -- $ S.fromList [1,2,3,4] -- (10,24) ---- -- IMPORTANT: scanl' evaluates the accumulator to WHNF. To avoid -- building lazy expressions inside the accumulator, it is recommended -- that a strict data structure is used for accumulator. scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b -- | Like scanl' but with a monadic fold function. scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b -- | Like scanl' but does not stream the initial value of the -- accumulator. -- --
-- postscanl' f z xs = S.drop 1 $ S.scanl' f z xs --postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b -- | Like postscanl' but with a monadic step function. postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b -- | Like scanl' but for a non-empty stream. The first element of -- the stream is used as the initial value of the accumulator. Does -- nothing if the stream is empty. -- --
-- > S.toList $ S.scanl1 (+) $ fromList [1,2,3,4] -- [1,3,6,10] --scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a -- | Like scanl1' but with a monadic step function. scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a -- | Scan a stream using the given monadic fold. scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b -- | Postscan a stream using the given monadic fold. postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b -- | Include only those elements that pass a predicate. filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a -- | Same as filter but with a monadic predicate. filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a -- | Map a Maybe returning function to a stream, filter out the -- Nothing elements, and return a stream of values extracted from -- Just. -- -- Equivalent to: -- --
-- mapMaybe f = S.map fromJust . S.filter isJust . S.map f --mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b -- | Like mapMaybe but maps a monadic function. -- -- Equivalent to: -- --
-- mapMaybeM f = S.map fromJust . S.filter isJust . S.mapM f ---- -- Concurrent (do not use with parallely on infinite -- streams) mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b -- | Deletes the first occurence of the element in the stream that -- satisfies the given equality predicate. -- --
-- > S.toList $ S.deleteBy (==) 3 $ S.fromList [1,3,3,5] -- [1,3,5] --deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a -- | Drop repeated elements that are adjacent to each other. uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a -- | insertBy cmp elem stream inserts elem before the -- first element in stream that is less than elem when -- compared using cmp. -- --
-- insertBy cmp x = mergeBy cmp (yield x) ---- --
-- > S.toList $ S.insertBy compare 2 $ S.fromList [1,3,5] -- [1,2,3,5] --insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a -- | Generate a stream by performing a monadic action between consecutive -- elements of the given stream. -- -- Concurrent (do not use with parallely on infinite -- streams) -- --
-- > S.toList $ S.intersperseM (return ',') $ S.fromList "hello" -- "h,e,l,l,o" --intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a -- | Generate a stream by inserting a given element between consecutive -- elements of the given stream. -- --
-- > S.toList $ S.intersperse ',' $ S.fromList "hello" -- "h,e,l,l,o" --intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a -- |
-- indexed = S.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) -- indexed = S.zipWith (,) (S.enumerateFrom 0) ---- -- Pair each element in a stream with its index, starting from index 0. -- --
-- > S.toList $ S.indexed $ S.fromList "hello" -- [(0,h),(1,e),(2,l),(3,l),(4,o)] --indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) -- |
-- indexedR n = S.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) -- indexedR n = S.zipWith (,) (S.enumerateFromThen n (n - 1)) ---- -- Pair each element in a stream with its index, starting from the given -- index n and counting down. -- --
-- > S.toList $ S.indexedR 10 $ S.fromList "hello" -- [(10,h),(9,e),(8,l),(7,l),(6,o)] --indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) -- | Returns the elements of the stream in reverse order. The stream must -- be finite. Note that this necessarily buffers the entire stream in -- memory. -- -- Since 0.7.0 (Monad m constraint) -- -- Since: 0.1.1 reverse :: (IsStream t, Monad m) => t m a -> t m a -- | Take first n elements from the stream and discard the rest. take :: (IsStream t, Monad m) => Int -> t m a -> t m a -- | End the stream as soon as the predicate fails on an element. takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a -- | Same as takeWhile but with a monadic predicate. takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a -- | Discard first n elements from the stream and take the rest. drop :: (IsStream t, Monad m) => Int -> t m a -> t m a -- | Drop elements in the stream as long as the predicate succeeds and then -- take the rest of the stream. dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a -- | Same as dropWhile but with a monadic predicate. dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a -- | Group the input stream into groups of n elements each and -- then fold each group using the provided fold function. -- --
-- > S.toList $ S.chunksOf 2 FL.sum (S.enumerateFromTo 1 10) -- [3,7,11,15,19] ---- -- This can be considered as an n-fold version of ltake where we -- apply ltake repeatedly on the leftover stream until the -- stream exhausts. chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b -- | Group the input stream into windows of n second each and then -- fold each group using the provided fold function. intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b -- | Find all the indices where the element in the stream satisfies the -- given predicate. findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int -- | Find all the indices where the value of the element in the stream is -- equal to the given value. elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int -- | Split on an infixed separator element, dropping the separator. Splits -- the stream on separator elements determined by the supplied predicate, -- separator is considered as infixed between two segments, if one side -- of the separator is missing then it is parsed as an empty stream. The -- supplied Fold is applied on the split segments. With - -- representing non-separator elements and . as separator, -- splitOn splits as follows: -- --
-- "--.--" => "--" "--" -- "--." => "--" "" -- ".--" => "" "--" ---- -- splitOn (== x) is an inverse of intercalate (S.yield -- x) -- -- Let's use the following definition for illustration: -- --
-- splitOn' p xs = S.toList $ S.splitOn p (FL.toList) (S.fromList xs) ---- --
-- >>> splitOn' (== '.') "" -- [""] ---- --
-- >>> splitOn' (== '.') "." -- ["",""] ---- --
-- >>> splitOn' (== '.') ".a" -- > ["","a"] ---- --
-- >>> splitOn' (== '.') "a." -- > ["a",""] ---- --
-- >>> splitOn' (== '.') "a.b" -- > ["a","b"] ---- --
-- >>> splitOn' (== '.') "a..b" -- > ["a","","b"] --splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b -- | Like splitOn but the separator is considered as suffixed to the -- segments in the stream. A missing suffix at the end is allowed. A -- separator at the beginning is parsed as empty segment. With - -- representing elements and . as separator, splitOnSuffix -- splits as follows: -- --
-- "--.--." => "--" "--" -- "--.--" => "--" "--" -- ".--." => "" "--" ---- --
-- splitOnSuffix' p xs = S.toList $ S.splitSuffixBy p (FL.toList) (S.fromList xs) ---- --
-- >>> splitOnSuffix' (== '.') "" -- [] ---- --
-- >>> splitOnSuffix' (== '.') "." -- [""] ---- --
-- >>> splitOnSuffix' (== '.') "a" -- ["a"] ---- --
-- >>> splitOnSuffix' (== '.') ".a" -- > ["","a"] ---- --
-- >>> splitOnSuffix' (== '.') "a." -- > ["a"] ---- --
-- >>> splitOnSuffix' (== '.') "a.b" -- > ["a","b"] ---- --
-- >>> splitOnSuffix' (== '.') "a.b." -- > ["a","b"] ---- --
-- >>> splitOnSuffix' (== '.') "a..b.." -- > ["a","","b",""] ---- --
-- lines = splitOnSuffix (== '\n') --splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b -- | Like splitOnSuffix but keeps the suffix attached to the -- resulting splits. -- --
-- splitWithSuffix' p xs = S.toList $ S.splitWithSuffix p (FL.toList) (S.fromList xs) ---- --
-- >>> splitWithSuffix' (== '.') "" -- [] ---- --
-- >>> splitWithSuffix' (== '.') "." -- ["."] ---- --
-- >>> splitWithSuffix' (== '.') "a" -- ["a"] ---- --
-- >>> splitWithSuffix' (== '.') ".a" -- > [".","a"] ---- --
-- >>> splitWithSuffix' (== '.') "a." -- > ["a."] ---- --
-- >>> splitWithSuffix' (== '.') "a.b" -- > ["a.","b"] ---- --
-- >>> splitWithSuffix' (== '.') "a.b." -- > ["a.","b."] ---- --
-- >>> splitWithSuffix' (== '.') "a..b.." -- > ["a.",".","b.","."] --splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b -- | Like splitOn after stripping leading, trailing, and repeated -- separators. Therefore, ".a..b." with . as the -- separator would be parsed as ["a","b"]. In other words, its -- like parsing words from whitespace separated text. -- --
-- wordsBy' p xs = S.toList $ S.wordsBy p (FL.toList) (S.fromList xs) ---- --
-- >>> wordsBy' (== ',') "" -- > [] ---- --
-- >>> wordsBy' (== ',') "," -- > [] ---- --
-- >>> wordsBy' (== ',') ",a,,b," -- > ["a","b"] ---- --
-- words = wordsBy isSpace --wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b -- |
-- groups = groupsBy (==) -- groups = groupsByRolling (==) ---- -- Groups contiguous spans of equal elements together in individual -- groups. -- --
-- >>> S.toList $ S.groups FL.toList $ S.fromList [1,1,2,2] -- > [[1,1],[2,2]] --groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b -- | groupsBy cmp f $ S.fromList [a,b,c,...] assigns the element -- a to the first group, if a `cmp` b is True -- then b is also assigned to the same group. If a `cmp` -- c is True then c is also assigned to the same -- group and so on. When the comparison fails a new group is started. -- Each group is folded using the fold f and the result of the -- fold is emitted in the output stream. -- --
-- >>> S.toList $ S.groupsBy (>) FL.toList $ S.fromList [1,3,7,0,2,5] -- > [[1,3,7],[0,2,5]] --groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b -- | Unlike groupsBy this function performs a rolling comparison -- of two successive elements in the input stream. groupsByRolling -- cmp f $ S.fromList [a,b,c,...] assigns the element a to -- the first group, if a `cmp` b is True then b -- is also assigned to the same group. If b `cmp` c is -- True then c is also assigned to the same group and so -- on. When the comparison fails a new group is started. Each group is -- folded using the fold f. -- --
-- >>> S.toList $ S.groupsByRolling (\a b -> a + 1 == b) FL.toList $ S.fromList [1,2,3,7,8,9] -- > [[1,2,3],[7,8,9]] --groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b -- | Merge two streams using a comparison function. The head elements of -- both the streams are compared and the smaller of the two elements is -- emitted, if both elements are equal then the element from the first -- stream is used first. -- -- If the streams are sorted in ascending order, the resulting stream -- would also remain sorted in ascending order. -- --
-- > S.toList $ S.mergeBy compare (S.fromList [1,3,5]) (S.fromList [2,4,6,8]) -- [1,2,3,4,5,6,8] --mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -- | Like mergeBy but with a monadic comparison function. -- -- Merge two streams randomly: -- --
-- > randomly _ _ = randomIO >>= x -> return $ if x then LT else GT -- > S.toList $ S.mergeByM randomly (S.fromList [1,1,1,1]) (S.fromList [2,2,2,2]) -- [2,1,2,2,2,1,1,1] ---- -- Merge two streams in a proportion of 2:1: -- --
-- proportionately m n = do -- ref <- newIORef $ cycle $ concat [replicate m LT, replicate n GT] -- return $ \_ _ -> do -- r <- readIORef ref -- writeIORef ref $ tail r -- return $ head r -- -- main = do -- f <- proportionately 2 1 -- xs <- S.toList $ S.mergeByM f (S.fromList [1,1,1,1,1,1]) (S.fromList [2,2,2]) -- print xs ---- --
-- [1,1,2,1,1,2,1,1,2] --mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a -- | Like mergeBy but merges concurrently (i.e. both the elements -- being merged are generated concurrently). mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -- | Like mergeByM but merges concurrently (i.e. both the elements -- being merged are generated concurrently). mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a -- | Zip two streams serially using a pure zipping function. -- --
-- > S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) -- [5,7,9] --zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c -- | Like zipWith but using a monadic zipping function. zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c -- | Like zipWith but zips concurrently i.e. both the streams -- being zipped are generated concurrently. zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c -- | Like zipWithM but zips concurrently i.e. both the streams -- being zipped are generated concurrently. zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c -- | concatMapWith merge map stream is a two dimensional looping -- combinator. The first argument specifies a merge or concat function -- that is used to merge the streams generated by applying the second -- argument i.e. the map function to each element of the input -- stream. The concat function could be serial, -- parallel, async, ahead or any other zip or -- merge function and the second argument could be any stream generation -- function using a seed. -- -- Compare foldMapWith concatMapWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b -- | Map a stream producing function on each element of the stream and then -- flatten the results into a single stream. -- --
-- concatMap = concatMapWith serial -- concatMap f = concatMapM (return . f) --concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b -- | Map a stream producing monadic function on each element of the stream -- and then flatten the results into a single stream. Since the stream -- generation function is monadic, unlike concatMap, it can -- produce an effect at the beginning of each iteration of the inner -- loop. concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b -- | Like concatMap but uses an Unfold for stream generation. -- Unlike concatMap this can fuse the Unfold code with the -- inner loop and therefore provide many times better performance. concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b -- | Run a side effect before the stream yields its first element. before :: (IsStream t, Monad m) => m b -> t m a -> t m a -- | Run a side effect whenever the stream stops normally. after :: (IsStream t, Monad m) => m b -> t m a -> t m a -- | Run the first action before the stream starts and remember its output, -- generate a stream using the output, run the second action using the -- remembered value as an argument whenever the stream ends normally or -- due to an exception. bracket :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a -- | Run a side effect whenever the stream aborts due to an exception. onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a -- | Run a side effect whenever the stream stops normally or aborts due to -- an exception. finally :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a -- | When evaluating a stream if an exception occurs, stream evaluation -- aborts and the specified exception handler is run with the exception -- as argument. handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a -- | Same as yieldM -- | Deprecated: Please use yieldM instead. once :: (Monad m, IsStream t) => m a -> t m a -- | Same as fromFoldable. -- | Deprecated: Please use fromFoldable instead. each :: (IsStream t, Foldable f) => f a -> t m a -- | Strict left scan with an extraction function. Like scanl', but -- applies a user supplied extraction function (the third argument) at -- each step. This is designed to work with the foldl library. -- The suffix x is a mnemonic for extraction. -- -- Since: 0.7.0 (Monad m constraint) -- -- Since 0.2.0 -- | Deprecated: Please use scanl followed by map instead. scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b -- | Strict left fold with an extraction function. Like the standard strict -- left fold, but applies a user supplied extraction function (the third -- argument) to the folded value at the end. This is designed to work -- with the foldl library. The suffix x is a mnemonic -- for extraction. -- | Deprecated: Please use foldl' followed by fmap instead. foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b -- | Like foldx, but with a monadic step function. -- | Deprecated: Please use foldlM' followed by fmap instead. foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b -- | Lazy right fold for non-empty streams, using first element as the -- starting value. Returns Nothing if the stream is empty. -- | Deprecated: Use foldrM instead. foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) -- | Run a stream, discarding the results. By default it interprets the -- stream as SerialT, to run other types of streams use the type -- adapting combinators for example runStream . -- asyncly. -- | Deprecated: Please use "drain" instead runStream :: Monad m => SerialT m a -> m () -- |
-- runN n = runStream . take n ---- -- Run maximum up to n iterations of a stream. -- | Deprecated: Please use "drainN" instead runN :: Monad m => Int -> SerialT m a -> m () -- |
-- runWhile p = runStream . takeWhile p ---- -- Run a stream as long as the predicate holds true. -- | Deprecated: Please use "drainWhile" instead runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () -- | Read lines from an IO Handle into a stream of Strings. -- | Deprecated: Please use Streamly.FileSystem.Handle module (see the -- changelog) fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String -- |
-- toHandle h = S.mapM_ $ hPutStrLn h ---- -- Write a stream of Strings to an IO Handle. -- | Deprecated: Please use Streamly.FileSystem.Handle module (see the -- changelog) toHandle :: MonadIO m => Handle -> SerialT m String -> m () -- | Streamly is a general purpose programming framework using cocnurrent -- data flow programming paradigm. It can be considered as a -- generalization of Haskell lists to monadic streaming with concurrent -- composition capability. The serial stream type in streamly SerialT -- m a is like the list type [a] parameterized by the monad -- m. For example, SerialT IO a is a moral equivalent -- of [a] in the IO monad. Streams are constructed very much -- like lists, except that they use nil and cons instead of -- '[]' and :. -- --
-- > import Streamly -- > import Streamly.Prelude (cons, consM) -- > import qualified Streamly.Prelude as S -- > -- > S.toList $ 1 `cons` 2 `cons` 3 `cons` nil -- [1,2,3] ---- -- Unlike lists, streams can be constructed from monadic effects: -- --
-- > S.toList $ getLine `consM` getLine `consM` S.nil -- hello -- world -- ["hello","world"] ---- -- Streams are processed just like lists, with list like combinators, -- except that they are monadic and work in a streaming fashion. Here is -- a simple console echo program example: -- --
-- > S.drain $ S.repeatM getLine & S.mapM putStrLn ---- -- SerialT Identity a is a moral equivalent of pure lists. -- Streamly utilizes fusion for high performance, therefore, we can -- represent and process strings as streams of Char, encode and -- decode the streams to/from UTF8 and serialize them to Array -- Word8 obviating the need for special purpose libraries like -- bytestring and text. -- -- For more details please see the Streamly.Tutorial module and -- the examples directory in this package. module Streamly -- | A monad that can perform concurrent or parallel IO operations. Streams -- that can be composed concurrently require the underlying monad to be -- MonadAsync. type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) -- | The Semigroup operation for SerialT behaves like a -- regular append operation. Therefore, when a <> b is -- evaluated, stream a is evaluated first until it exhausts and -- then stream b is evaluated. In other words, the elements of -- stream b are appended to the elements of stream a. -- This operation can be used to fold an infinite lazy container of -- streams. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- -- main = (S.toList . serially $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- The Monad instance runs the monadic continuation for -- each element of the stream, serially. -- --
-- main = S.drain . serially $ do -- x <- return 1 <> return 2 -- S.yieldM $ print x ---- --
-- 1 -- 2 ---- -- SerialT nests streams serially in a depth first manner. -- --
-- main = S.drain . serially $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- S.yieldM $ print (x, y) ---- --
-- (1,3) -- (1,4) -- (2,3) -- (2,4) ---- -- We call the monadic code being run for each element of the stream a -- monadic continuation. In imperative paradigm we can think of this -- composition as nested for loops and the monadic continuation -- is the body of the loop. The loop iterates for all elements of the -- stream. -- -- Note that the behavior and semantics of SerialT, including -- Semigroup and Monad instances are exactly like Haskell -- lists except that SerialT can contain effectful actions while -- lists are pure. -- -- In the code above, the serially combinator can be omitted as -- the default stream type is SerialT. data SerialT m a -- | The Semigroup operation for WSerialT interleaves the -- elements from the two streams. Therefore, when a <> b -- is evaluated, stream a is evaluated first to produce the -- first element of the combined stream and then stream b is -- evaluated to produce the next element of the combined stream, and then -- we go back to evaluating stream a and so on. In other words, -- the elements of stream a are interleaved with the elements of -- stream b. -- -- Note that when multiple actions are combined like a <> b -- <> c ... <> z we interleave them in a binary fashion -- i.e. a and b are interleaved with each other and the -- result is interleaved with c and so on. This will not act as -- a true round-robin scheduling across all the streams. Note that this -- operation cannot be used to fold a container of infinite streams as -- the state that it needs to maintain is proportional to the number of -- streams. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- -- main = (S.toList . wSerially $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- Similarly, the Monad instance interleaves the iterations of the -- inner and the outer loop, nesting loops in a breadth first manner. -- --
-- main = S.drain . wSerially $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- S.yieldM $ print (x, y) ---- --
-- (1,3) -- (2,3) -- (1,4) -- (2,4) --data WSerialT m a -- | The Semigroup operation for AheadT appends two streams. -- The combined stream behaves like a single stream with the actions from -- the second stream appended to the first stream. The combined stream is -- evaluated in the speculative style. This operation can be used to fold -- an infinite lazy container of streams. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = do -- xs <- S.toList . aheadly $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) -- print xs -- where p n = threadDelay 1000000 >> return n ---- --
-- [1,2,3,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. -- -- The monad instance of AheadT may run each monadic continuation -- (bind) concurrently in a speculative manner, performing side effects -- in a partially ordered manner but producing the outputs in an ordered -- manner like SerialT. -- --
-- main = S.drain . aheadly $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 --data AheadT m a -- | The Semigroup operation for AsyncT appends two streams. -- The combined stream behaves like a single stream with the actions from -- the second stream appended to the first stream. The combined stream is -- evaluated in the asynchronous style. This operation can be used to -- fold an infinite lazy container of streams. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = (S.toList . asyncly $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the monad instance of AsyncT may run each -- iteration concurrently based on demand. More concurrent iterations are -- started only if the previous iterations are not able to produce enough -- output for the consumer. -- --
-- main = drain . asyncly $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 --data AsyncT m a -- | The Semigroup operation for WAsyncT interleaves the -- elements from the two streams. Therefore, when a <> b -- is evaluated, one action is picked from stream a for -- evaluation and then the next action is picked from stream b -- and then the next action is again picked from stream a, going -- around in a round-robin fashion. Many such actions are executed -- concurrently depending on maxThreads and maxBuffer -- limits. Results are served to the consumer in the order completion of -- the actions. -- -- Note that when multiple actions are combined like a <> b -- <> c ... <> z we go in a round-robin fasion across -- all of them picking one action from each up to z and then -- come back to a. Note that this operation cannot be used to -- fold a container of infinite streams as the state that it needs to -- maintain is proportional to the number of streams. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = (S.toList . wAsyncly $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the Monad instance of WAsyncT runs all -- iterations fairly concurrently using a round robin scheduling. -- --
-- main = drain . wAsyncly $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 --data WAsyncT m a -- | Async composition with strict concurrent execution of all streams. -- -- The Semigroup instance of ParallelT executes both the -- streams concurrently without any delay or without waiting for the -- consumer demand and merges the results as they arrive. If the -- consumer does not consume the results, they are buffered upto a -- configured maximum, controlled by the maxBuffer primitive. If -- the buffer becomes full the concurrent tasks will block until there is -- space in the buffer. -- -- Both WAsyncT and ParallelT, evaluate the constituent -- streams fairly in a round robin fashion. The key difference is that -- WAsyncT might wait for the consumer demand before it executes -- the tasks whereas ParallelT starts executing all the tasks -- immediately without waiting for the consumer demand. For -- WAsyncT the maxThreads limit applies whereas for -- ParallelT it does not apply. In other words, WAsyncT -- can be lazy whereas ParallelT is strict. -- -- ParallelT is useful for cases when the streams are required to -- be evaluated simultaneously irrespective of how the consumer consumes -- them e.g. when we want to race two tasks and want to start both -- strictly at the same time or if we have timers in the parallel tasks -- and our results depend on the timers being started at the same time. -- If we do not have such requirements then AsyncT or -- AheadT are recommended as they can be more efficient than -- ParallelT. -- --
-- main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- When streams with more than one element are merged, it yields -- whichever stream yields first without any bias, unlike the -- Async style streams. -- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the Monad instance of ParallelT runs -- all iterations of the loop concurrently. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = drain . parallely $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- Note that parallel composition can only combine a finite number of -- streams as it needs to retain state for each unfinished stream. -- -- Since: 0.7.0 (maxBuffer applies to ParallelT streams) -- -- Since: 0.1.0 data ParallelT m a -- | The applicative instance of ZipSerialM zips a number of streams -- serially i.e. it produces one element from each stream serially and -- then zips all those elements. -- --
-- main = (toList . zipSerially $ (,,) <$> s1 <*> s2 <*> s3) >>= print -- where s1 = fromFoldable [1, 2] -- s2 = fromFoldable [3, 4] -- s3 = fromFoldable [5, 6] ---- --
-- [(1,3,5),(2,4,6)] ---- -- The Semigroup instance of this type works the same way as that -- of SerialT. data ZipSerialM m a -- | Like ZipSerialM but zips in parallel, it generates all the -- elements to be zipped concurrently. -- --
-- main = (toList . zipAsyncly $ (,,) <$> s1 <*> s2 <*> s3) >>= print -- where s1 = fromFoldable [1, 2] -- s2 = fromFoldable [3, 4] -- s3 = fromFoldable [5, 6] ---- --
-- [(1,3,5),(2,4,6)] ---- -- The Semigroup instance of this type works the same way as that -- of SerialT. data ZipAsyncM m a -- | Parallel function application operator for streams; just like the -- regular function application operator $ except that it is -- concurrent. The following code prints a value every second even though -- each stage adds a 1 second delay. -- --
-- drain $ -- S.mapM (\x -> threadDelay 1000000 >> print x) -- |$ S.repeatM (threadDelay 1000000 >> return 1) ---- -- Concurrent (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 |$ -- | Parallel reverse function application operator for streams; just like -- the regular reverse function application operator & -- except that it is concurrent. -- --
-- drain $ -- S.repeatM (threadDelay 1000000 >> return 1) -- |& S.mapM (\x -> threadDelay 1000000 >> print x) ---- -- Concurrent (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 |& -- | Parallel function application operator; applies a run or -- fold function to a stream such that the fold consumer and the -- stream producer run in parallel. A run or fold -- function reduces the stream to a value in the underlying monad. The -- . at the end of the operator is a mnemonic for termination of -- the stream. -- --
-- S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () -- |$. S.repeatM (threadDelay 1000000 >> return 1) ---- -- Concurrent (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 |$. -- | Parallel reverse function application operator for applying a run or -- fold functions to a stream. Just like |$. except that the -- operands are reversed. -- --
-- S.repeatM (threadDelay 1000000 >> return 1) -- |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () ---- -- Concurrent (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 |&. -- | Make a stream asynchronous, triggers the computation and returns a -- stream in the underlying monad representing the output generated by -- the original computation. The returned action is exhaustible and must -- be drained once. If not drained fully we may have a thread blocked -- forever and once exhausted it will always return empty. mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) -- | Polymorphic version of the Semigroup operation <> -- of SerialT. Appends two streams sequentially, yielding all -- elements from the first stream, and then all elements from the second -- stream. serial :: IsStream t => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of WSerialT. Interleaves two streams, yielding one element from -- each stream alternately. When one stream stops the rest of the other -- stream is used in the output stream. wSerial :: IsStream t => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of AheadT. Merges two streams sequentially but with concurrent -- lookahead. ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of AsyncT. Merges two streams possibly concurrently, preferring -- the elements from the left one when available. async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of WAsyncT. Merges two streams concurrently choosing elements -- from both fairly. wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of ParallelT Merges two streams concurrently. parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Specify the maximum number of threads that can be spawned concurrently -- for any concurrent combinator in a stream. A value of 0 resets the -- thread limit to default, a negative value means there is no limit. The -- default value is 1500. maxThreads does not affect -- ParallelT streams as they can use unbounded number of -- threads. -- -- When the actions in a stream are IO bound, having blocking IO calls, -- this option can be used to control the maximum number of in-flight IO -- requests. When the actions are CPU bound this option can be used to -- control the amount of CPU used by the stream. maxThreads :: IsStream t => Int -> t m a -> t m a -- | Specify the maximum size of the buffer for storing the results from -- concurrent computations. If the buffer becomes full we stop spawning -- more concurrent tasks until there is space in the buffer. A value of 0 -- resets the buffer size to default, a negative value means there is no -- limit. The default value is 1500. -- -- CAUTION! using an unbounded maxBuffer value (i.e. a negative -- value) coupled with an unbounded maxThreads value is a recipe -- for disaster in presence of infinite streams, or very large streams. -- Especially, it must not be used when pure is used in -- ZipAsyncM streams as pure in applicative zip streams -- generates an infinite stream causing unbounded concurrent generation -- with no limit on the buffer or threads. maxBuffer :: IsStream t => Int -> t m a -> t m a -- | Specifies the stream yield rate in yields per second (Hertz). -- We keep accumulating yield credits at rateGoal. At any point of -- time we allow only as many yields as we have accumulated as per -- rateGoal since the start of time. If the consumer or the -- producer is slower or faster, the actual rate may fall behind or -- exceed rateGoal. We try to recover the gap between the two by -- increasing or decreasing the pull rate from the producer. However, if -- the gap becomes more than rateBuffer we try to recover only as -- much as rateBuffer. -- -- rateLow puts a bound on how low the instantaneous rate can go -- when recovering the rate gap. In other words, it determines the -- maximum yield latency. Similarly, rateHigh puts a bound on how -- high the instantaneous rate can go when recovering the rate gap. In -- other words, it determines the minimum yield latency. We reduce the -- latency by increasing concurrency, therefore we can say that it puts -- an upper bound on concurrency. -- -- If the rateGoal is 0 or negative the stream never yields a -- value. If the rateBuffer is 0 or negative we do not attempt to -- recover. data Rate Rate :: Double -> Double -> Double -> Int -> Rate -- | The lower rate limit [rateLow] :: Rate -> Double -- | The target rate we want to achieve [rateGoal] :: Rate -> Double -- | The upper rate limit [rateHigh] :: Rate -> Double -- | Maximum slack from the goal [rateBuffer] :: Rate -> Int -- | Specify the pull rate of a stream. A Nothing value resets the -- rate to default which is unlimited. When the rate is specified, -- concurrent production may be ramped up or down automatically to -- achieve the specified yield rate. The specific behavior for different -- styles of Rate specifications is documented under Rate. -- The effective maximum production rate achieved by a stream is governed -- by: -- --
-- foldWith async $ map return [1..3] ---- -- Equivalent to: -- --
-- foldWith f = S.foldMapWith f id ---- -- Since: 0.1.0 (Streamly) foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a -- | A variant of foldMap that allows you to map a monadic streaming -- action on a Foldable container and then fold it using the -- specified stream merge operation. -- --
-- foldMapWith async return [1..3] ---- -- Equivalent to: -- --
-- foldMapWith f g xs = S.concatMapWith f g (S.fromFoldable xs) ---- -- Since: 0.1.0 (Streamly) foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b -- | Like foldMapWith but with the last two arguments reversed i.e. -- the monadic streaming function is the last argument. -- -- Equivalent to: -- --
-- forEachWith = flip S.foldMapWith ---- -- Since: 0.1.0 (Streamly) forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b -- | The class of semigroups (types with an associative binary operation). -- -- Instances should satisfy the associativity law: -- -- class Semigroup a -- | An associative operation. (<>) :: Semigroup a => a -> a -> a -- | Reduce a non-empty list with <> -- -- The default definition should be sufficient, but this can be -- overridden for efficiency. sconcat :: Semigroup a => NonEmpty a -> a -- | Repeat a value n times. -- -- Given that this works on a Semigroup it is allowed to fail if -- you request 0 or fewer repetitions, and the default definition will do -- so. -- -- By making this a member of the class, idempotent semigroups and -- monoids can upgrade this to execute in O(1) by picking -- stimes = stimesIdempotent or stimes = -- stimesIdempotentMonoid respectively. stimes :: (Semigroup a, Integral b) => b -> a -> a infixr 6 <> -- | Same as IsStream. -- | Deprecated: Please use IsStream instead. type Streaming = IsStream -- | Same as "Streamly.Prelude.runStream". -- | Deprecated: Please use Streamly.Prelude.drain instead. runStream :: Monad m => SerialT m a -> m () -- | Same as runStream -- | Deprecated: Please use runStream instead. runStreaming :: (Monad m, IsStream t) => t m a -> m () -- | Same as runStream. -- | Deprecated: Please use runStream instead. runStreamT :: Monad m => SerialT m a -> m () -- | Same as runStream . wSerially. -- | Deprecated: Please use 'runStream . interleaving' instead. runInterleavedT :: Monad m => WSerialT m a -> m () -- | Same as runStream . asyncly. -- | Deprecated: Please use 'runStream . asyncly' instead. runAsyncT :: Monad m => AsyncT m a -> m () -- | Same as runStream . parallely. -- | Deprecated: Please use 'runStream . parallely' instead. runParallelT :: Monad m => ParallelT m a -> m () -- | Same as runStream . zipping. -- | Deprecated: Please use 'runStream . zipSerially instead. runZipStream :: Monad m => ZipSerialM m a -> m () -- | Same as runStream . zippingAsync. -- | Deprecated: Please use 'runStream . zipAsyncly instead. runZipAsync :: Monad m => ZipAsyncM m a -> m () -- | Deprecated: Please use SerialT instead. type StreamT = SerialT -- | Deprecated: Please use WSerialT instead. type InterleavedT = WSerialT -- | Deprecated: Please use ZipSerialM instead. type ZipStream = ZipSerialM -- | Same as wSerially. -- | Deprecated: Please use wSerially instead. interleaving :: IsStream t => WSerialT m a -> t m a -- | Same as zipSerially. -- | Deprecated: Please use zipSerially instead. zipping :: IsStream t => ZipSerialM m a -> t m a -- | Same as zipAsyncly. -- | Deprecated: Please use zipAsyncly instead. zippingAsync :: IsStream t => ZipAsyncM m a -> t m a -- | Same as wSerial. -- | Deprecated: Please use wSerial instead. (<=>) :: IsStream t => t m a -> t m a -> t m a infixr 5 <=> -- | Same as async. -- | Deprecated: Please use async instead. (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Read and write streams and arrays to and from file handles. File -- handle IO APIs are quite similar to Streamly.Memory.Array read -- write APIs. In that regard, arrays can be considered as in-memory -- files or files can be considered as on-disk arrays. -- -- Control over file reading and writing behavior in terms of buffering, -- encoding, decoding is in the hands of the programmer, the -- TextEncoding, NewLineMode, and Buffering -- options of the underlying handle provided by GHC are not needed and -- ignored. -- --
-- import qualified Streamly.FileSystem.Handle as FH ---- -- For additional, experimental APIs take a look at -- Streamly.Internal.FileSystem.Handle module. -- --
-- import qualified Streamly.Network.Socket as SK ---- -- For additional, experimental APIs take a look at -- Streamly.Internal.Network.Socket module. module Streamly.Network.Socket -- | Specify the socket protocol details. data SockSpec SockSpec :: !Family -> !SocketType -> !ProtocolNumber -> ![(SocketOption, Int)] -> SockSpec [sockFamily] :: SockSpec -> !Family [sockType] :: SockSpec -> !SocketType [sockProto] :: SockSpec -> !ProtocolNumber [sockOpts] :: SockSpec -> ![(SocketOption, Int)] -- | Unfold a three tuple (listenQLen, spec, addr) into a stream -- of connected protocol sockets corresponding to incoming connections. -- listenQLen is the maximum number of pending connections in -- the backlog. spec is the socket protocol and options -- specification and addr is the protocol address where the -- server listens for incoming connections. accept :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket -- | Unfolds a Socket into a byte stream. IO requests to the socket -- are performed in sizes of defaultChunkSize. read :: MonadIO m => Unfold m Socket Word8 -- | Unfolds the tuple (bufsize, socket) into a byte stream, read -- requests to the socket are performed using buffers of -- bufsize. readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8 -- | Unfolds a socket into a stream of Word8 arrays. Requests to the -- socket are performed using a buffer of size defaultChunkSize. -- The size of arrays in the resulting stream are therefore less than or -- equal to defaultChunkSize. readChunks :: MonadIO m => Unfold m Socket (Array Word8) -- | Unfold the tuple (bufsize, socket) into a stream of -- Word8 arrays. Read requests to the socket are performed using a -- buffer of size bufsize. The size of an array in the resulting -- stream is always less than or equal to bufsize. readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8) -- | Write a byte stream to a socket. Accumulates the input in chunks of up -- to defaultChunkSize bytes before writing. -- --
-- write = writeWithBufferOf defaultChunkSize --write :: MonadIO m => Socket -> Fold m Word8 () -- | Write a byte stream to a socket. Accumulates the input in chunks of -- specified number of bytes before writing. writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 () -- | Write a stream of arrays to a socket. Each array in the stream is -- written to the socket as a separate IO request. writeChunks :: (MonadIO m, Storable a) => Socket -> Fold m (Array a) () -- | Combinators to build Inet/TCP clients and servers. -- --
-- import qualified Streamly.Network.Inet.TCP as TCP --module Streamly.Network.Inet.TCP -- | Unfold a tuple (ipAddr, port) into a stream of connected TCP -- sockets. ipAddr is the local IP address and port is -- the local port on which connections are accepted. acceptOnAddr :: MonadIO m => Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket -- | Like acceptOnAddr but binds on the IPv4 address -- 0.0.0.0 i.e. on all IPv4 addresses/interfaces of the machine -- and listens for TCP connections on the specified port. -- --
-- acceptOnPort = UF.supplyFirst acceptOnAddr (0,0,0,0) --acceptOnPort :: MonadIO m => Unfold m PortNumber Socket -- | Like acceptOnAddr but binds on the localhost IPv4 address -- 127.0.0.1. The server can only be accessed from the local -- host, it cannot be accessed from other hosts on the network. -- --
-- acceptOnPortLocal = UF.supplyFirst acceptOnAddr (127,0,0,1) --acceptOnPortLocal :: MonadIO m => Unfold m PortNumber Socket -- | Connect to the specified IP address and port number. connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket -- | Streamly is a general computing framework based on concurrent data -- flow programming. The IO monad and pure lists are a special case of -- streamly. On one hand, streamly extends the lists of pure values to -- lists of monadic actions, on the other hand it extends the IO monad -- with concurrent non-determinism. In simple imperative terms we can say -- that streamly extends the IO monad with for loops and nested -- for loops with concurrency support. Hopefully, this analogy -- becomes clearer once you go through this tutorial. -- -- Streaming in general enables writing modular, composable and scalable -- applications with ease, and concurrency allows you to make them scale -- and perform well. Streamly enables writing scalable concurrent -- applications without being aware of threads or synchronization. No -- explicit thread control is needed. Where applicable, concurrency rate -- is automatically controlled based on the demand by the consumer. -- However, combinators can be used to fine tune the concurrency control. -- -- Streaming and concurrency together enable expressing reactive -- applications conveniently. See the CirclingSquare example in -- the examples directory for a simple SDL based FRP example. To -- summarize, streamly provides a unified computing framework for -- streaming, non-determinism and functional reactive programming in an -- elegant and simple API that is a natural extension of pure lists to -- monadic streams. -- -- In this tutorial we will go over the basic concepts and how to use the -- library. Before you go through this tutorial we recommend that you -- take a look at: -- --