-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Beautiful Streaming, Concurrent and Reactive Composition -- -- Streamly, short for streaming concurrently, provides monadic streams, -- with a simple API, almost identical to standard lists, and an in-built -- support for concurrency. By using stream-style combinators on stream -- composition, streams can be generated, merged, chained, mapped, -- zipped, and consumed concurrently – providing a generalized high level -- programming framework unifying streaming and concurrency. Controlled -- concurrency allows even infinite streams to be evaluated concurrently. -- Concurrency is auto scaled based on feedback from the stream consumer. -- The programmer does not have to be aware of threads, locking or -- synchronization to write scalable concurrent programs. -- -- The basic streaming functionality of streamly is equivalent to that -- provided by streaming libraries like streaming, pipes, -- and conduit. In addition to providing streaming functionality, -- streamly subsumes the functionality of list transformer libraries like -- pipes or list-t and also the logic programming library -- logict. On the concurrency side, it subsumes the functionality -- of the async package. Because it supports streaming with -- concurrency we can write FRP applications similar in concept to -- Yampa or reflex. -- -- For file IO, currently the library provides only one API to stream the -- lines in the file as Strings. Future versions will provide better -- streaming file IO options. Streamly interoperates with the popular -- streaming libraries, see the interoperation section in -- Streamly.Tutorial. -- -- Why use streamly? -- --
-- import qualified Streamly.Prelude as S ---- -- Functions with the suffix M are general functions that work -- on monadic arguments. The corresponding functions without the suffix -- M work on pure arguments and can in general be derived from -- their monadic versions but are provided for convenience and for -- consistency with other pure APIs in the base package. -- -- In many cases, short definitions of the combinators are provided in -- the documentation for illustration. The actual implementation may -- differ for performance reasons. -- -- Functions having a MonadAsync constraint work concurrently when -- used with appropriate stream type combinator. Please be careful to not -- use parallely with infinite streams. -- -- Deconstruction and folds accept a SerialT type instead of a -- polymorphic type to ensure that streams always have a concrete -- monomorphic type by default, reducing type errors. In case you want to -- use any other type of stream you can use one of the type combinators -- provided in the Streamly module to convert the stream type. module Streamly.Prelude -- | An empty stream. -- --
-- > toList nil -- [] --nil :: IsStream t => t m a -- | Construct a stream by adding a pure value at the head of an existing -- stream. For serial streams this is the same as (return a) `consM` -- r but more efficient. For concurrent streams this is not -- concurrent whereas consM is concurrent. For example: -- --
-- > toList $ 1 `cons` 2 `cons` 3 `cons` nil -- [1,2,3] --cons :: IsStream t => a -> t m a -> t m a infixr 5 `cons` -- | Operator equivalent of cons. -- --
-- > toList $ 1 .: 2 .: 3 .: nil -- [1,2,3] --(.:) :: IsStream t => a -> t m a -> t m a infixr 5 .: -- | Constructs a stream by adding a monadic action at the head of an -- existing stream. For example: -- --
-- > toList $ getLine `consM` getLine `consM` nil -- hello -- world -- ["hello","world"] ---- -- Concurrent (do not use parallely to construct infinite -- streams) consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 `consM` -- | Operator equivalent of consM. We can read it as "parallel -- colon" to remember that | comes before :. -- --
-- > toList $ getLine |: getLine |: nil -- hello -- world -- ["hello","world"] ---- --
-- let delay = threadDelay 1000000 >> print 1 -- runStream $ serially $ delay |: delay |: delay |: nil -- runStream $ parallely $ delay |: delay |: delay |: nil ---- -- Concurrent (do not use parallely to construct infinite -- streams) (|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 |: -- |
-- yield a = a `cons` nil ---- -- Create a singleton stream from a pure value. -- -- The following holds in monadic streams, but not in Zip streams: -- --
-- yield = pure -- yield = yieldM . pure ---- -- In Zip applicative streams yield is not the same as pure -- because in that case pure is equivalent to repeat -- instead. yield and pure are equally efficient, in other -- cases yield may be slightly more efficient than the other -- equivalent definitions. yield :: IsStream t => a -> t m a -- |
-- yieldM m = m `consM` nil ---- -- Create a singleton stream from a monadic action. -- --
-- > toList $ yieldM getLine -- hello -- ["hello"] --yieldM :: (Monad m, IsStream t) => m a -> t m a -- |
-- repeatM = fix . cons -- repeatM = cycle1 . yield ---- -- Generate an infinite stream by repeating a pure value. repeat :: IsStream t => a -> t m a -- |
-- repeatM = fix . consM -- repeatM = cycle1 . yieldM ---- -- Generate a stream by repeatedly executing a monadic action forever. -- --
-- runStream $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) -- runStream $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) ---- -- Concurrent, infinite (do not use with parallely) repeatM :: (IsStream t, MonadAsync m) => m a -> t m a -- |
-- replicate = take n . repeat ---- -- Generate a stream of length n by repeating a value n -- times. replicate :: (IsStream t, Monad m) => Int -> a -> t m a -- |
-- replicateM = take n . repeatM ---- -- Generate a stream by performing a monadic action n times. -- Same as: -- --
-- runStream $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) -- runStream $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) ---- -- Concurrent replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -- | Types that can be enumerated as a stream. The operations in this type -- class are equivalent to those in the Enum type class, except -- that these generate a stream instead of a list. Use the functions in -- Streamly.Enumeration module to define new instances. class Enum a => Enumerable a -- | enumerateFrom from generates a stream starting with the -- element from, enumerating up to maxBound when the type -- is Bounded or generating an infinite stream when the type is -- not Bounded. -- --
-- > S.toList $ S.take 4 $ S.enumerateFrom (0 :: Int) -- [0,1,2,3] ---- -- For Fractional types, enumeration is numerically stable. -- However, no overflow or underflow checks are performed. -- --
-- > S.toList $ S.take 4 $ S.enumerateFrom 1.1 -- [1.1,2.1,3.1,4.1] --enumerateFrom :: (Enumerable a, IsStream t, Monad m) => a -> t m a -- | Generate a finite stream starting with the element from, -- enumerating the type up to the value to. If to is -- smaller than from then an empty stream is returned. -- --
-- > S.toList $ S.enumerateFromTo 0 4 -- [0,1,2,3,4] ---- -- For Fractional types, the last element is equal to the -- specified to value after rounding to the nearest integral -- value. -- --
-- > S.toList $ S.enumerateFromTo 1.1 4 -- [1.1,2.1,3.1,4.1] -- > S.toList $ S.enumerateFromTo 1.1 4.6 -- [1.1,2.1,3.1,4.1,5.1] --enumerateFromTo :: (Enumerable a, IsStream t, Monad m) => a -> a -> t m a -- | enumerateFromThen from then generates a stream whose first -- element is from, the second element is then and the -- successive elements are in increments of then - from. -- Enumeration can occur downwards or upwards depending on whether -- then comes before or after from. For Bounded -- types the stream ends when maxBound is reached, for unbounded -- types it keeps enumerating infinitely. -- --
-- > S.toList $ S.take 4 $ S.enumerateFromThen 0 2 -- [0,2,4,6] -- > S.toList $ S.take 4 $ S.enumerateFromThen 0 (-2) -- [0,-2,-4,-6] --enumerateFromThen :: (Enumerable a, IsStream t, Monad m) => a -> a -> t m a -- | enumerateFromThenTo from then to generates a finite stream -- whose first element is from, the second element is -- then and the successive elements are in increments of -- then - from up to to. Enumeration can occur -- downwards or upwards depending on whether then comes before -- or after from. -- --
-- > S.toList $ S.enumerateFromThenTo 0 2 6 -- [0,2,4,6] -- > S.toList $ S.enumerateFromThenTo 0 (-2) (-6) -- [0,-2,-4,-6] --enumerateFromThenTo :: (Enumerable a, IsStream t, Monad m) => a -> a -> a -> t m a -- |
-- enumerate = enumerateFrom minBound ---- -- Enumerate a Bounded type from its minBound to -- maxBound enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a -- |
-- enumerateTo = enumerateFromTo minBound ---- -- Enumerate a Bounded type from its minBound to specified -- value. enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a -- |
-- unfoldr step s = -- case step s of -- Nothing -> nil -- Just (a, b) -> a `cons` unfoldr step b ---- -- Build a stream by unfolding a pure step function step -- starting from a seed s. The step function returns the next -- element in the stream and the next seed value. When it is done it -- returns Nothing and the stream ends. For example, -- --
-- let f b = -- if b > 3 -- then Nothing -- else Just (b, b + 1) -- in toList $ unfoldr f 0 ---- --
-- [0,1,2,3] --unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a -- | Build a stream by unfolding a monadic step function starting -- from a seed. The step function returns the next element in the stream -- and the next seed value. When it is done it returns Nothing and -- the stream ends. For example, -- --
-- let f b = -- if b > 3 -- then return Nothing -- else print b >> return (Just (b, b + 1)) -- in runStream $ unfoldrM f 0 ---- --
-- 0 -- 1 -- 2 -- 3 ---- -- When run concurrently, the next unfold step can run concurrently with -- the processing of the output of the previous step. Note that more than -- one step cannot run concurrently as the next step depends on the -- output of the previous step. -- --
-- (asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0) -- & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () ---- -- Concurrent -- -- Since: 0.1.0 unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a -- |
-- iterate f x = x `cons` iterate f x ---- -- Generate an infinite stream with x as the first element and -- each successive element derived by applying the function f on -- the previous element. -- --
-- > S.toList $ S.take 5 $ S.iterate (+1) 1 -- [1,2,3,4,5] --iterate :: IsStream t => (a -> a) -> a -> t m a -- |
-- iterateM f m = m `consM` iterateM f m ---- -- Generate an infinite stream with the first element generated by the -- action m and each successive element derived by applying the -- monadic function f on the previous element. -- -- When run concurrently, the next iteration can run concurrently with -- the processing of the previous iteration. Note that more than one -- iteration cannot run concurrently as the next iteration depends on the -- output of the previous iteration. -- --
-- runStream $ serially $ S.take 10 $ S.iterateM -- (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0 -- -- runStream $ asyncly $ S.take 10 $ S.iterateM -- (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0 ---- -- Concurrent iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> a -> t m a -- |
-- fromIndices f = let g i = f i `cons` g (i + 1) in g 0 ---- -- Generate an infinite stream, whose values are the output of a function -- f applied on the corresponding index. Index starts at 0. -- --
-- > S.toList $ S.take 5 $ S.fromIndices id -- [0,1,2,3,4] --fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a -- |
-- fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0 ---- -- Generate an infinite stream, whose values are the output of a monadic -- function f applied on the corresponding index. Index starts -- at 0. fromIndicesM :: (IsStream t, Monad m) => (Int -> m a) -> t m a -- |
-- fromList = foldr cons nil ---- -- Construct a stream from a list of pure values. This is more efficient -- than fromFoldable for serial streams. fromList :: (Monad m, IsStream t) => [a] -> t m a -- |
-- fromListM = foldr consM nil ---- -- Construct a stream from a list of monadic actions. This is more -- efficient than fromFoldableM for serial streams. fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a -- |
-- fromFoldable = foldr cons nil ---- -- Construct a stream from a Foldable containing pure values: fromFoldable :: (IsStream t, Foldable f) => f a -> t m a -- |
-- fromFoldableM = foldr consM nil ---- -- Construct a stream from a Foldable containing monadic actions. -- --
-- runStream $ serially $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) -- runStream $ asyncly $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) ---- -- Concurrent (do not use with parallely on infinite -- containers) fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a -- | Read lines from an IO Handle into a stream of Strings. fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String -- | Decompose a stream into its head and tail. If the stream is empty, -- returns Nothing. If the stream is non-empty, returns Just -- (a, ma), where a is the head of the stream and -- ma its tail. uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) -- | Lazy right associative fold. -- -- For lists a foldr looks like: -- --
-- foldr f z [] = z -- foldr f z (x:xs) = x `f` foldr f z xs ---- -- The recursive expression is the second argument of the fold step -- f. Therefore, the evaluation of the recursive call depends on -- f. It can terminate recursion by not inspecting the second -- argument based on a condition. When expanded fully, it results in the -- following right associated expression: -- --
-- foldr f z xs == x1 `f` (x2 `f` ...(xn `f` z)) ---- -- When f is a constructor, we can see that the first -- deconstruction of this expression would be x1 on the left and -- the recursive expression on the right. Therefore, we can deconstruct -- it to access the input elements in the first-in-first-out (FIFO) order -- and consume the reconstructed structure lazily. The recursive -- expression on the right gets evaluated incrementall as demanded by the -- consumer. For example: -- --
-- > S.foldr (:) [] $ S.fromList [1,2,3,4] -- [1,2,3,4] ---- -- When f is a function strict in its second argument, the right -- side of the expression gets evaluated as follows: -- --
-- foldr f z xs == x1 `f` tail1 -- tail1 == x2 `f` tail2 -- tail2 == x3 `f` tail3 -- ... -- tailn == xn `f` z ---- -- In foldl' we have both the arguments of f available -- at each step, therefore, each step can be reduced immediately. -- However, in foldr the second argument to f is a -- recursive call, therefore, it ends up building the whole expression in -- memory before it can be reduced, consuming the whole input. This makes -- foldr much less efficient for reduction compared to -- foldl'. For example: -- --
-- > S.foldr (+) 0 $ S.fromList [1,2,3,4] -- 10 ---- -- When the underlying monad m is strict (e.g. IO), then -- foldr ends up evaluating all of its input because of strict -- evaluation of the recursive call: -- --
-- > S.foldr (\_ _ -> []) [] $ S.fromList (1:undefined) -- *** Exception: Prelude.undefined ---- -- In a lazy monad, we can consume the input lazily, and terminate the -- fold by conditionally not inspecting the recursive expression. -- --
-- > runIdentity $ S.foldr (\x rest -> if x == 3 then [] else x : rest) [] $ S.fromList (4:1:3:undefined) -- [4,1] ---- -- The arguments to the folding function (a -> b -> b) are -- in the head and tail order of the output, a is the head and -- b is the tail. Remember, in a right fold the zero is on the -- right, it is the tail end. foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b -- | Lazy right fold for non-empty streams, using first element as the -- starting value. Returns Nothing if the stream is empty. foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) -- | Lazy right fold with a monadic step function. For example, to fold a -- stream into a list: -- --
-- >> S.foldrM (\x xs -> return (x : xs)) [] $ fromList [1,2,3] -- [1,2,3] --foldrM :: Monad m => (a -> b -> m b) -> b -> SerialT m a -> m b -- | Strict left associative fold. -- -- For lists a foldl looks like: -- --
-- foldl f z [] = z -- foldl f z (x:xs) = foldl f (z `f` x) xs ---- -- The recursive call at the head of the output expression is bound to be -- evaluated until recursion terminates, deconstructing the whole -- input container and building the following left associated -- expression: -- --
-- foldl f z xs == (((z `f` x1) `f` x2) ...) `f` xn ---- -- When f is a constructor, we can see that the first -- deconstruction of this expression would be the recursive expression on -- the left and xn on the right. Therefore, it can access the -- input elements only in the reverse (LIFO) order. For example: -- --
-- > S.foldl' (flip (:)) [] $ S.fromList [1,2,3,4] -- [4,3,2,1] ---- -- The strict left fold foldl' forces the reduction of its -- argument z `f` x before using it, therefore it never builds -- the whole expression in memory. Thus, z `f` x1 would get -- reduced to z1 and then z1 `f` x2 would get reduced -- to z2 and so on, incrementally reducing the expression as it -- recurses. However, it evaluates the accumulator only to WHNF, it may -- further help to use a strict data structure as accumulator. For -- example: -- --
-- > S.foldl' (+) 0 $ S.fromList [1,2,3,4] -- 10 ---- --
-- 0 + 1 -- (0 + 1) + 2 -- ((0 + 1) + 2) + 3 -- (((0 + 1) + 2) + 3) + 4 ---- -- foldl strictly deconstructs the whole input container -- irrespective of whether it needs it or not: -- --
-- > S.foldl' (\acc x -> if x == 3 then acc else x : acc) [] $ S.fromList (4:1:3:undefined) -- *** Exception: Prelude.undefined ---- -- However, evaluation of the items contained in the input container is -- lazy as demanded by the fold step function: -- --
-- > S.foldl' (\acc x -> if x == 3 then acc else x : acc) [] $ S.fromList [4,1,3,undefined] -- [4,1] ---- -- To perform a left fold without consuming all the input one can use -- scanl to stream the intermediate results of the fold and use -- them lazily. -- -- In stateful or event-driven programming, we can consider z as -- the initial state and the stream being folded as a stream of events, -- thus foldl' processes all the events in the stream updating -- the state on each event and then ultimately returning the final state. -- -- The arguments to the folding function (b -> a -> b) are -- in the head and tail order of the output expression, b is the -- head and a is the tail. Remember, in a left fold the zero is -- on the left, at the head of the expression. foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b -- | Strict left fold, for non-empty streams, using first element as the -- starting value. Returns Nothing if the stream is empty. foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) -- | Like foldl' but with a monadic step function. foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b -- | Strict left fold with an extraction function. Like the standard strict -- left fold, but applies a user supplied extraction function (the third -- argument) to the folded value at the end. This is designed to work -- with the foldl library. The suffix x is a mnemonic -- for extraction. foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b -- | Like foldx, but with a monadic step function. foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b -- | Run a stream, discarding the results. By default it interprets the -- stream as SerialT, to run other types of streams use the type -- adapting combinators for example runStream . -- asyncly. runStream :: Monad m => SerialT m a -> m () -- |
-- runN n = runStream . take n ---- -- Run maximum up to n iterations of a stream. runN :: Monad m => Int -> SerialT m a -> m () -- |
-- runWhile p = runStream . takeWhile p ---- -- Run a stream as long as the predicate holds true. runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () -- | Lookup the element at the given index. (!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) -- | Extract the first element of the stream, if any. -- --
-- head = (!! 0) --head :: Monad m => SerialT m a -> m (Maybe a) -- | Extract the last element of the stream, if any. -- --
-- last xs = xs !! (length xs - 1) --last :: Monad m => SerialT m a -> m (Maybe a) -- | Returns the first element that satisfies the given predicate. findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) -- | Like findM but with a non-monadic predicate. -- --
-- find p = findM (return . p) --find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) -- | In a stream of (key-value) pairs (a, b), return the value -- b of the first pair where the key equals the given value -- a. -- --
-- lookup = snd <$> find ((==) . fst) --lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) -- | Returns the first index that satisfies the given predicate. findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) -- | Returns the first index where a given value is found in the stream. -- --
-- elemIndex a = findIndex (== a) --elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) -- | Extract all but the first element of the stream, if any. tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) -- | Extract all but the last element of the stream, if any. init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) -- | Determine whether the stream is empty. null :: Monad m => SerialT m a -> m Bool -- | Determine whether an element is present in the stream. elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool -- | Determine whether an element is not present in the stream. notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool -- | Determine whether all elements of a stream satisfy a predicate. all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool -- | Determine whether any of the elements of a stream satisfy a predicate. any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool -- | Determines if all elements of a boolean stream are True. and :: Monad m => SerialT m Bool -> m Bool -- | Determines whether at least one element of a boolean stream is True. or :: Monad m => SerialT m Bool -> m Bool -- | Determine the length of the stream. length :: Monad m => SerialT m a -> m Int -- | Determine the sum of all elements of a stream of numbers. Returns -- 0 when the stream is empty. Note that this is not numerically -- stable for floating point numbers. sum :: (Monad m, Num a) => SerialT m a -> m a -- | Determine the product of all elements of a stream of numbers. Returns -- 1 when the stream is empty. product :: (Monad m, Num a) => SerialT m a -> m a -- | Determine the maximum element in a stream using the supplied -- comparison function. maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) -- |
-- maximum = maximumBy compare ---- -- Determine the maximum element in a stream. maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) -- | Determine the minimum element in a stream using the supplied -- comparison function. minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) -- |
-- minimum = minimumBy compare ---- -- Determine the minimum element in a stream. minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) -- | Ensures that all the elements of the stream are identical and then -- returns that unique element. the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) -- |
-- toList = S.foldr (:) [] ---- -- Convert a stream into a list in the underlying monad. Same as: toList :: Monad m => SerialT m a -> m [a] -- |
-- toHandle h = S.mapM_ $ hPutStrLn h ---- -- Write a stream of Strings to an IO Handle. toHandle :: MonadIO m => Handle -> SerialT m String -> m () -- | Strict left scan. -- --
-- > S.toList $ S.scanl' (+) 0 $ fromList [1,2,3,4] -- [0,1,3,6,10] ---- --
-- > S.toList $ S.scanl' (flip (:)) [] $ S.fromList [1,2,3,4] -- [[],[1],[2,1],[3,2,1],[4,3,2,1]] ---- -- The output of scanl' is the initial value of the accumulator -- followed by all the intermediate steps and the final result of -- foldl'. -- -- By streaming the accumulated state after each fold step, we can share -- the state across multiple stages of stream composition. Each stage can -- modify or extend the state, do some processing with it and emit it for -- the next stage, thus modularizing the stream processing. This can be -- useful in stateful or event-driven programming. -- -- Consider the following example, computing the sum and the product of -- the elements in a stream in one go using a foldl': -- --
-- > S.foldl' (\(s, p) x -> (s + x, p * x)) (0,1) $ S.fromList [1,2,3,4] -- (10,24) ---- -- Using scanl' we can compute the sum in the first stage and -- pass it down to the next stage for computing the product: -- --
-- > S.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1) -- $ S.scanl' (\(s, _) x -> (s + x, x)) (0,1) -- $ S.fromList [1,2,3,4] -- (10,24) ---- -- IMPORTANT: scanl' evaluates the accumulator to WHNF. To avoid -- building lazy expressions inside the accumulator, it is recommended -- that a strict data structure is used for accumulator. scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b -- | Like scanl' but with a monadic fold function. scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b -- | Like scanl' but for a non-empty stream. The first element of -- the stream is used as the initial value of the accumulator. Does -- nothing if the stream is empty. -- --
-- > S.toList $ S.scanl1 (+) $ fromList [1,2,3,4] -- [1,3,6,10] --scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a -- | Like scanl1' but with a monadic step function. scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a -- | Strict left scan with an extraction function. Like scanl', but -- applies a user supplied extraction function (the third argument) at -- each step. This is designed to work with the foldl library. -- The suffix x is a mnemonic for extraction. scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b -- |
-- map = fmap ---- -- Same as fmap. -- --
-- > S.toList $ S.map (+1) $ S.fromList [1,2,3] -- [2,3,4] --map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b -- |
-- sequence = mapM id ---- -- Replace the elements of a stream of monadic actions with the outputs -- of those actions. -- --
-- > runStream $ S.sequence $ S.fromList [putStr "a", putStr "b", putStrLn "c"] -- abc -- -- runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) -- & (serially . S.sequence) -- -- runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) -- & (asyncly . S.sequence) ---- -- Concurrent (do not use with parallely on infinite -- streams) sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a -- |
-- mapM f = sequence . map f ---- -- Apply a monadic function to each element of the stream and replace it -- with the output of the resulting action. -- --
-- > runStream $ S.mapM putStr $ S.fromList ["a", "b", "c"] -- abc -- -- runStream $ S.replicateM 10 (return 1) -- & (serially . S.mapM (\x -> threadDelay 1000000 >> print x)) -- -- runStream $ S.replicateM 10 (return 1) -- & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x)) ---- -- Concurrent (do not use with parallely on infinite -- streams) mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b -- | Include only those elements that pass a predicate. filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a -- | Same as filter but with a monadic predicate. filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a -- | Take first n elements from the stream and discard the rest. take :: (IsStream t, Monad m) => Int -> t m a -> t m a -- | End the stream as soon as the predicate fails on an element. takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a -- | Same as takeWhile but with a monadic predicate. takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a -- | Discard first n elements from the stream and take the rest. drop :: (IsStream t, Monad m) => Int -> t m a -> t m a -- | Drop elements in the stream as long as the predicate succeeds and then -- take the rest of the stream. dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a -- | Same as dropWhile but with a monadic predicate. dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a -- | Deletes the first occurence of the element in the stream that -- satisfies the given equality predicate. -- --
-- > S.toList $ S.deleteBy (==) 3 $ S.fromList [1,3,3,5] -- [1,3,5] --deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a -- | Drop repeated elements that are adjacent to each other. uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a -- | insertBy cmp elem stream inserts elem before the -- first element in stream that is less than elem when -- compared using cmp. -- --
-- insertBy cmp x = mergeBy cmp (yield x) ---- --
-- > S.toList $ S.insertBy compare 2 $ S.fromList [1,3,5] -- [1,2,3,5] --insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a -- | Generate a stream by performing a monadic action between consecutive -- elements of the given stream. -- -- Concurrent (do not use with parallely on infinite -- streams) -- --
-- > S.toList $ S.intersperseM (putChar 'a' >> return ',') $ S.fromList "hello" -- aaaa"h,e,l,l,o" --intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a -- | Returns the elements of the stream in reverse order. The stream must -- be finite. reverse :: IsStream t => t m a -> t m a -- | Apply a monadic action to each element of the stream and discard the -- output of the action. mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () -- | Map a Maybe returning function to a stream, filter out the -- Nothing elements, and return a stream of values extracted from -- Just. mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b -- | Like mapMaybe but maps a monadic function. -- -- Concurrent (do not use with parallely on infinite -- streams) mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b -- | Find all the indices where the element in the stream satisfies the -- given predicate. findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int -- | Find all the indices where the value of the element in the stream is -- equal to the given value. elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int -- | Merge two streams using a comparison function. The head elements of -- both the streams are compared and the smaller of the two elements is -- emitted, if both elements are equal then the element from the first -- stream is used first. -- -- If the streams are sorted in ascending order, the resulting stream -- would also remain sorted in ascending order. -- --
-- > S.toList $ S.mergeBy compare (S.fromList [1,3,5]) (S.fromList [2,4,6,8]) -- [1,2,3,4,5,6,8] --mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -- | Like mergeBy but with a monadic comparison function. -- -- Merge two streams randomly: -- --
-- > randomly _ _ = randomIO >>= x -> return $ if x then LT else GT -- > S.toList $ S.mergeByM randomly (S.fromList [1,1,1,1]) (S.fromList [2,2,2,2]) -- [2,1,2,2,2,1,1,1] ---- -- Merge two streams in a proportion of 2:1: -- --
-- proportionately m n = do -- ref <- newIORef $ cycle $ concat [replicate m LT, replicate n GT] -- return $ \_ _ -> do -- r <- readIORef ref -- writeIORef ref $ tail r -- return $ head r -- -- main = do -- f <- proportionately 2 1 -- xs <- S.toList $ S.mergeByM f (S.fromList [1,1,1,1,1,1]) (S.fromList [2,2,2]) -- print xs ---- --
-- [1,1,2,1,1,2,1,1,2] --mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a -- | Like mergeBy but merges concurrently (i.e. both the elements -- being merged are generated concurrently). mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -- | Like mergeByM but merges concurrently (i.e. both the elements -- being merged are generated concurrently). mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a -- | Zip two streams serially using a pure zipping function. -- --
-- > S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) -- [5,7,9] --zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c -- | Like zipWith but using a monadic zipping function. zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c -- | Like zipWith but zips concurrently i.e. both the streams -- being zipped are generated concurrently. zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c -- | Like zipWithM but zips concurrently i.e. both the streams -- being zipped are generated concurrently. zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c -- |
-- indexed = S.zipWith (,) (S.intFrom 0) ---- -- Pair each element in a stream with its index. -- --
-- > S.toList $ S.indexed $ S.fromList "hello" -- [(0,h),(1,e),(2,l),(3,l),(4,o)] --indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) -- |
-- indexedR n = S.zipWith (,) (S.intFromThen n (n - 1)) ---- -- Pair each element in a stream with its index, starting from the given -- index n and counting down. -- --
-- > S.toList $ S.indexedR 10 $ S.fromList "hello" -- [(9,h),(8,e),(7,l),(6,l),(5,o)] --indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) -- | Map each element to a stream using a monadic function and then flatten -- the results into a single stream. concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b -- | Map each element to a stream and then flatten the results into a -- single stream. -- --
-- concatMap f = concatMapM (return . f) --concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b -- | Compare two streams for equality using an equality function. eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool -- | Compare two streams lexicographically using a comparison function. cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering -- | Returns True if the first stream is the same as or a prefix of -- the second. -- --
-- > S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char) -- True --isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool -- | Returns True if all the elements of the first stream occur, in -- order, in the second stream. The elements do not have to occur -- consecutively. A stream is treated as a subsequence of itself. -- --
-- > S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char) -- True --isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool -- | Drops the given prefix from a stream. Returns Nothing if the -- stream does not start with the given prefix. Returns Just nil -- when the prefix is the same as the stream. stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) -- | Same as yieldM -- | Deprecated: Please use yieldM instead. once :: (Monad m, IsStream t) => m a -> t m a -- | Same as fromFoldable. -- | Deprecated: Please use fromFoldable instead. each :: (IsStream t, Foldable f) => f a -> t m a -- | Deprecated: Please use scanx instead. scan :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b -- | Deprecated: Please use foldx instead. foldl :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b -- | Deprecated: Please use foldxM instead. foldlM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b -- | The way a list represents a sequence of pure values, a stream -- represents a sequence of monadic actions. The monadic stream API -- offered by Streamly is very close to the Haskell Prelude pure -- lists' API, it can be considered as a natural extension of lists to -- monadic actions. Streamly streams provide concurrent composition and -- merging of streams. It can be considered as a concurrent list -- transformer. In contrast to the Prelude lists, merging or -- appending streams of arbitrary length is scalable and inexpensive. -- -- The basic stream type is Serial, it represents a sequence of IO -- actions, and is a Monad. The type SerialT is a monad -- transformer that can represent a sequence of actions in an arbitrary -- monad. The type Serial is in fact a synonym for SerialT -- IO. There are a few more types similar to SerialT, all of -- them represent a stream and differ only in the Semigroup, -- Applicative and Monad compositions of the stream. -- Serial and WSerial types compose serially whereas -- Async and WAsync types compose concurrently. All these -- types can be freely inter-converted using type combinators without any -- cost. You can freely switch to any type of composition at any point in -- the program. When no type annotation or explicit stream type -- combinators are used, the default stream type is inferred as -- Serial. -- -- Here is a simple console echo program example: -- --
-- > runStream $ S.repeatM getLine & S.mapM putStrLn ---- -- For more details please see the Streamly.Tutorial module and -- the examples directory in this package. -- -- This module exports stream types, instances and some basic operations. -- Functionality exported by this module include: -- --
-- import Streamly --module Streamly -- | A monad that can perform concurrent or parallel IO operations. Streams -- that can be composed concurrently require the underlying monad to be -- MonadAsync. type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) -- | Deep serial composition or serial composition with depth first -- traversal. The Semigroup instance of SerialT appends two -- streams serially in a depth first manner, yielding all elements from -- the first stream, and then all elements from the second stream. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- -- main = (toList . serially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- The Monad instance runs the monadic continuation for -- each element of the stream, serially. -- --
-- main = runStream . serially $ do -- x <- return 1 <> return 2 -- S.yieldM $ print x ---- --
-- 1 -- 2 ---- -- SerialT nests streams serially in a depth first manner. -- --
-- main = runStream . serially $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- S.yieldM $ print (x, y) ---- --
-- (1,3) -- (1,4) -- (2,3) -- (2,4) ---- -- This behavior of SerialT is exactly like a list transformer. We -- call the monadic code being run for each element of the stream a -- monadic continuation. In imperative paradigm we can think of this -- composition as nested for loops and the monadic continuation -- is the body of the loop. The loop iterates for all elements of the -- stream. -- -- The serially combinator can be omitted as the default stream -- type is SerialT. Note that serial composition with depth first -- traversal can be used to combine an infinite number of streams as it -- explores only one stream at a time. data SerialT m a -- | Wide serial composition or serial composition with a breadth first -- traversal. The Semigroup instance of WSerialT traverses -- the two streams in a breadth first manner. In other words, it -- interleaves two streams, yielding one element from each stream -- alternately. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- -- main = (toList . wSerially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- Similarly, the Monad instance interleaves the iterations of the -- inner and the outer loop, nesting loops in a breadth first manner. -- --
-- main = runStream . wSerially $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- S.yieldM $ print (x, y) ---- --
-- (1,3) -- (2,3) -- (1,4) -- (2,4) ---- -- Note that a serial composition with breadth first traversal can only -- combine a finite number of streams as it needs to retain state for -- each unfinished stream. data WSerialT m a -- | Deep ahead composition or ahead composition with depth first -- traversal. The semigroup composition of AheadT appends streams -- in a depth first manner just like SerialT except that it can -- produce elements concurrently ahead of time. It is like -- AsyncT except that AsyncT produces the output as it -- arrives whereas AheadT orders the output in the traversal -- order. -- --
-- main = (toList . aheadly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. -- -- Similarly, the monad instance of AheadT may run each iteration -- concurrently ahead of time but presents the results in the same order -- as SerialT. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . aheadly $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- All iterations may run in the same thread if they do not block. -- -- Note that ahead composition with depth first traversal can be used to -- combine infinite number of streams as it explores only a bounded -- number of streams at a time. data AheadT m a -- | Deep async composition or async composition with depth first -- traversal. In a left to right Semigroup composition it tries to -- yield elements from the left stream as long as it can, but it can run -- the right stream in parallel if it needs to, based on demand. The -- right stream can be run if the left stream blocks on IO or cannot -- produce elements fast enough for the consumer. -- --
-- main = (toList . asyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the monad instance of AsyncT may run each -- iteration concurrently based on demand. More concurrent iterations are -- started only if the previous iterations are not able to produce enough -- output for the consumer. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . asyncly $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- All iterations may run in the same thread if they do not block. -- -- Note that async composition with depth first traversal can be used to -- combine infinite number of streams as it explores only a bounded -- number of streams at a time. data AsyncT m a -- | Wide async composition or async composition with breadth first -- traversal. The Semigroup instance of WAsyncT concurrently -- traverses the composed streams using a depth first travesal or -- in a round robin fashion, yielding elements from both streams -- alternately. -- --
-- main = (toList . wAsyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the Monad instance of WAsyncT runs all -- iterations fairly concurrently using a round robin scheduling. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . wAsyncly $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- Unlike AsyncT all iterations are guaranteed to run fairly -- concurrently, unconditionally. -- -- Note that async composition with breadth first traversal can only -- combine a finite number of streams as it needs to retain state for -- each unfinished stream. data WAsyncT m a -- | Async composition with simultaneous traversal of all streams. -- -- The Semigroup instance of ParallelT concurrently merges -- two streams, running both strictly concurrently and yielding elements -- from both streams as they arrive. When multiple streams are combined -- using ParallelT each one is evaluated in its own thread and the -- results produced are presented in the combined stream on a first come -- first serve basis. -- -- AsyncT and WAsyncT are concurrent lookahead -- streams each with a specific type of consumption pattern (depth -- first or breadth first). Since they are lookahead, they may introduce -- certain default latency in starting more concurrent tasks for -- efficiency reasons or may put a default limitation on the resource -- consumption (e.g. number of concurrent threads for lookahead). If we -- look at the implementation detail, they both can share a pool of -- worker threads to evaluate the streams in the desired pattern and at -- the desired rate. However, ParallelT uses a separate runtime -- thread to evaluate each stream. -- -- WAsyncT is similar to ParallelT, as both of them -- evaluate the constituent streams fairly in a round robin fashion. -- However, the key difference is that WAsyncT is lazy or pull -- driven whereas ParallelT is strict or push driven. -- ParallelT immediately starts concurrent evaluation of both the -- streams (in separate threads) and later picks the results whereas -- WAsyncT may wait for a certain latency threshold before -- initiating concurrent evaluation of the next stream. The concurrent -- scheduling of the next stream or the degree of concurrency is driven -- by the feedback from the consumer. In case of ParallelT each -- stream is evaluated in a separate thread and results are pushed -- to a shared output buffer, the evaluation rate is controlled by -- blocking when the buffer is full. -- -- Concurrent lookahead streams are generally more efficient than -- ParallelT and can work pretty efficiently even for smaller -- tasks because they do not necessarily use a separate thread for each -- task. So they should be preferred over ParallelT especially -- when efficiency is a concern and simultaneous strict evaluation is not -- a requirement. ParallelT is useful for cases when the streams -- are required to be evaluated simultaneously irrespective of how the -- consumer consumes them e.g. when we want to race two tasks and want to -- start both strictly at the same time or if we have timers in the -- parallel tasks and our results depend on the timers being started at -- the same time. We can say that ParallelT is almost the same -- (modulo some implementation differences) as WAsyncT when the -- latter is used with unlimited lookahead and zero latency in initiating -- lookahead. -- --
-- main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- When streams with more than one element are merged, it yields -- whichever stream yields first without any bias, unlike the -- Async style streams. -- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the Monad instance of ParallelT runs -- all iterations of the loop concurrently. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . parallely $ do -- n <- return 3 <> return 2 <> return 1 -- S.yieldM $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- Note that parallel composition can only combine a finite number of -- streams as it needs to retain state for each unfinished stream. data ParallelT m a -- | The applicative instance of ZipSerialM zips a number of streams -- serially i.e. it produces one element from each stream serially and -- then zips all those elements. -- --
-- main = (toList . zipSerially $ (,,) <$> s1 <*> s2 <*> s3) >>= print -- where s1 = fromFoldable [1, 2] -- s2 = fromFoldable [3, 4] -- s3 = fromFoldable [5, 6] ---- --
-- [(1,3,5),(2,4,6)] ---- -- The Semigroup instance of this type works the same way as that -- of SerialT. data ZipSerialM m a -- | Like ZipSerialM but zips in parallel, it generates all the -- elements to be zipped concurrently. -- --
-- main = (toList . zipAsyncly $ (,,) <$> s1 <*> s2 <*> s3) >>= print -- where s1 = fromFoldable [1, 2] -- s2 = fromFoldable [3, 4] -- s3 = fromFoldable [5, 6] ---- --
-- [(1,3,5),(2,4,6)] ---- -- The Semigroup instance of this type works the same way as that -- of SerialT. data ZipAsyncM m a -- | Run a stream, discarding the results. By default it interprets the -- stream as SerialT, to run other types of streams use the type -- adapting combinators for example runStream . -- asyncly. runStream :: Monad m => SerialT m a -> m () -- | Parallel function application operator for streams; just like the -- regular function application operator $ except that it is -- concurrent. The following code prints a value every second even though -- each stage adds a 1 second delay. -- --
-- runStream $ -- S.mapM (\x -> threadDelay 1000000 >> print x) -- |$ S.repeatM (threadDelay 1000000 >> return 1) ---- -- Concurrent (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 |$ -- | Parallel reverse function application operator for streams; just like -- the regular reverse function application operator & -- except that it is concurrent. -- --
-- runStream $ -- S.repeatM (threadDelay 1000000 >> return 1) -- |& S.mapM (\x -> threadDelay 1000000 >> print x) ---- -- Concurrent (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 |& -- | Parallel function application operator; applies a run or -- fold function to a stream such that the fold consumer and the -- stream producer run in parallel. A run or fold -- function reduces the stream to a value in the underlying monad. The -- . at the end of the operator is a mnemonic for termination of -- the stream. -- --
-- S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () -- |$. S.repeatM (threadDelay 1000000 >> return 1) ---- -- Concurrent (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 |$. -- | Parallel reverse function application operator for applying a run or -- fold functions to a stream. Just like |$. except that the -- operands are reversed. -- --
-- S.repeatM (threadDelay 1000000 >> return 1) -- |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () ---- -- Concurrent (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 |&. -- | Make a stream asynchronous, triggers the computation and returns a -- stream in the underlying monad representing the output generated by -- the original computation. The returned action is exhaustible and must -- be drained once. If not drained fully we may have a thread blocked -- forever and once exhausted it will always return empty. mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) -- | Polymorphic version of the Semigroup operation <> -- of SerialT. Appends two streams sequentially, yielding all -- elements from the first stream, and then all elements from the second -- stream. serial :: IsStream t => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of WSerialT. Interleaves two streams, yielding one element from -- each stream alternately. wSerial :: IsStream t => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of AheadT. Merges two streams sequentially but with concurrent -- lookahead. ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of AsyncT. Merges two streams possibly concurrently, preferring -- the elements from the left one when available. async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of WAsyncT. Merges two streams concurrently choosing elements -- from both fairly. wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of ParallelT Merges two streams concurrently. parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Specify the maximum number of threads that can be spawned concurrently -- for any concurrent combinator in a stream. A value of 0 resets the -- thread limit to default, a negative value means there is no limit. The -- default value is 1500. -- -- When the actions in a stream are IO bound, having blocking IO calls, -- this option can be used to control the maximum number of in-flight IO -- requests. When the actions are CPU bound this option can be used to -- control the amount of CPU used by the stream. maxThreads :: IsStream t => Int -> t m a -> t m a -- | Specify the maximum size of the buffer for storing the results from -- concurrent computations. If the buffer becomes full we stop spawning -- more concurrent tasks until there is space in the buffer. A value of 0 -- resets the buffer size to default, a negative value means there is no -- limit. The default value is 1500. -- -- CAUTION! using an unbounded maxBuffer value (i.e. a negative -- value) coupled with an unbounded maxThreads value is a recipe -- for disaster in presence of infinite streams, or very large streams. -- Especially, it must not be used when pure is used in -- ZipAsyncM streams as pure in applicative zip streams -- generates an infinite stream causing unbounded concurrent generation -- with no limit on the buffer or threads. maxBuffer :: IsStream t => Int -> t m a -> t m a -- | Specifies the stream yield rate in yields per second (Hertz). -- We keep accumulating yield credits at rateGoal. At any point of -- time we allow only as many yields as we have accumulated as per -- rateGoal since the start of time. If the consumer or the -- producer is slower or faster, the actual rate may fall behind or -- exceed rateGoal. We try to recover the gap between the two by -- increasing or decreasing the pull rate from the producer. However, if -- the gap becomes more than rateBuffer we try to recover only as -- much as rateBuffer. -- -- rateLow puts a bound on how low the instantaneous rate can go -- when recovering the rate gap. In other words, it determines the -- maximum yield latency. Similarly, rateHigh puts a bound on how -- high the instantaneous rate can go when recovering the rate gap. In -- other words, it determines the minimum yield latency. We reduce the -- latency by increasing concurrency, therefore we can say that it puts -- an upper bound on concurrency. -- -- If the rateGoal is 0 or negative the stream never yields a -- value. If the rateBuffer is 0 or negative we do not attempt to -- recover. data Rate Rate :: Double -> Double -> Double -> Int -> Rate -- | The lower rate limit [rateLow] :: Rate -> Double -- | The target rate we want to achieve [rateGoal] :: Rate -> Double -- | The upper rate limit [rateHigh] :: Rate -> Double -- | Maximum slack from the goal [rateBuffer] :: Rate -> Int -- | Specify the pull rate of a stream. A Nothing value resets the -- rate to default which is unlimited. When the rate is specified, -- concurrent production may be ramped up or down automatically to -- achieve the specified yield rate. The specific behavior for different -- styles of Rate specifications is documented under Rate. -- The effective maximum production rate achieved by a stream is governed -- by: -- --
-- foldWith async $ map return [1..3] --foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a -- | A variant of foldMap that allows you to map a monadic streaming -- action on a Foldable container and then fold it using the -- specified stream sum operation. -- --
-- foldMapWith async return [1..3] --foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b -- | Like foldMapWith but with the last two arguments reversed i.e. -- the monadic streaming function is the last argument. forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b -- | Class of types that can represent a stream of elements of some type -- a in some monad m. class IsStream t -- | Fix the type of a polymorphic stream as SerialT. serially :: IsStream t => SerialT m a -> t m a -- | Fix the type of a polymorphic stream as WSerialT. wSerially :: IsStream t => WSerialT m a -> t m a -- | Fix the type of a polymorphic stream as AsyncT. asyncly :: IsStream t => AsyncT m a -> t m a -- | Fix the type of a polymorphic stream as AheadT. aheadly :: IsStream t => AheadT m a -> t m a -- | Fix the type of a polymorphic stream as WAsyncT. wAsyncly :: IsStream t => WAsyncT m a -> t m a -- | Fix the type of a polymorphic stream as ParallelT. parallely :: IsStream t => ParallelT m a -> t m a -- | Fix the type of a polymorphic stream as ZipSerialM. zipSerially :: IsStream t => ZipSerialM m a -> t m a -- | Fix the type of a polymorphic stream as ZipAsyncM. zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a -- | Adapt any specific stream type to any other specific stream type. adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a -- | A serial IO stream of elements of type a. See SerialT -- documentation for more details. type Serial = SerialT IO -- | An interleaving serial IO stream of elements of type a. See -- WSerialT documentation for more details. type WSerial = WSerialT IO -- | A serial IO stream of elements of type a with concurrent -- lookahead. See AheadT documentation for more details. type Ahead = AheadT IO -- | A demand driven left biased parallely composing IO stream of elements -- of type a. See AsyncT documentation for more details. type Async = AsyncT IO -- | A round robin parallely composing IO stream of elements of type -- a. See WAsyncT documentation for more details. type WAsync = WAsyncT IO -- | A parallely composing IO stream of elements of type a. See -- ParallelT documentation for more details. type Parallel = ParallelT IO -- | An IO stream whose applicative instance zips streams serially. type ZipSerial = ZipSerialM IO -- | An IO stream whose applicative instance zips streams wAsyncly. type ZipAsync = ZipAsyncM IO -- | The class of semigroups (types with an associative binary operation). -- -- Instances should satisfy the associativity law: -- -- class Semigroup a -- | An associative operation. (<>) :: Semigroup a => a -> a -> a -- | Reduce a non-empty list with <> -- -- The default definition should be sufficient, but this can be -- overridden for efficiency. sconcat :: Semigroup a => NonEmpty a -> a -- | Repeat a value n times. -- -- Given that this works on a Semigroup it is allowed to fail if -- you request 0 or fewer repetitions, and the default definition will do -- so. -- -- By making this a member of the class, idempotent semigroups and -- monoids can upgrade this to execute in O(1) by picking -- stimes = stimesIdempotent or stimes = -- stimesIdempotentMonoid respectively. stimes :: (Semigroup a, Integral b) => b -> a -> a infixr 6 <> -- | Same as IsStream. -- | Deprecated: Please use IsStream instead. type Streaming = IsStream -- | Same as runStream -- | Deprecated: Please use runStream instead. runStreaming :: (Monad m, IsStream t) => t m a -> m () -- | Same as runStream. -- | Deprecated: Please use runStream instead. runStreamT :: Monad m => SerialT m a -> m () -- | Same as runStream . wSerially. -- | Deprecated: Please use 'runStream . interleaving' instead. runInterleavedT :: Monad m => WSerialT m a -> m () -- | Same as runStream . asyncly. -- | Deprecated: Please use 'runStream . asyncly' instead. runAsyncT :: Monad m => AsyncT m a -> m () -- | Same as runStream . parallely. -- | Deprecated: Please use 'runStream . parallely' instead. runParallelT :: Monad m => ParallelT m a -> m () -- | Same as runStream . zipping. -- | Deprecated: Please use 'runStream . zipSerially instead. runZipStream :: Monad m => ZipSerialM m a -> m () -- | Same as runStream . zippingAsync. -- | Deprecated: Please use 'runStream . zipAsyncly instead. runZipAsync :: Monad m => ZipAsyncM m a -> m () -- | Deprecated: Please use SerialT instead. type StreamT = SerialT -- | Deprecated: Please use WSerialT instead. type InterleavedT = WSerialT -- | Deprecated: Please use ZipSerialM instead. type ZipStream = ZipSerialM -- | Same as wSerially. -- | Deprecated: Please use wSerially instead. interleaving :: IsStream t => WSerialT m a -> t m a -- | Same as zipSerially. -- | Deprecated: Please use zipSerially instead. zipping :: IsStream t => ZipSerialM m a -> t m a -- | Same as zipAsyncly. -- | Deprecated: Please use zipAsyncly instead. zippingAsync :: IsStream t => ZipAsyncM m a -> t m a -- | Same as wSerial. -- | Deprecated: Please use wSerial instead. (<=>) :: IsStream t => t m a -> t m a -> t m a infixr 5 <=> -- | Same as async. -- | Deprecated: Please use async instead. (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Streamly is a general computing framework based on streaming IO. The -- IO monad and pure lists are a special case of streamly. On one hand, -- streamly extends the lists of pure values to lists of monadic actions, -- on the other hand it extends the IO monad with concurrent -- non-determinism. In simple imperative terms we can say that streamly -- extends the IO monad with for loops and nested for -- loops with concurrency support. You can understand this analogy better -- once you can go through this tutorial. -- -- Streaming in general enables writing modular, composable and scalable -- applications with ease, and concurrency allows you to make them scale -- and perform well. Streamly enables writing scalable concurrent -- applications without being aware of threads or synchronization. No -- explicit thread control is needed, where applicable the concurrency -- rate is automatically controlled based on the demand by the consumer. -- However, combinators can be used to fine tune the concurrency control. -- -- Streaming and concurrency together enable expressing reactive -- applications conveniently. See the CirclingSquare example in -- the examples directory for a simple SDL based FRP example. To -- summarize, streamly provides a unified computing framework for -- streaming, non-determinism and functional reactive programming in an -- elegant and simple API that is a natural extension of pure lists to -- monadic streams. -- -- In this tutorial we will go over the basic concepts and how to use the -- library. See the last section for further reading resources. module Streamly.Tutorial