-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Expand
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Expand a stream by combining two or more streams or by combining streams
-- with unfolds.

module Streamly.Internal.Data.Stream.IsStream.Expand
    (
    -- * Binary Combinators (Linear)
    -- | Functions ending in the shape:
    --
    -- @t m a -> t m a -> t m a@.
    --
    -- The functions in this section have a linear or flat n-ary combining
    -- characterstics. It means that when combined @n@ times (e.g. @a `serial`
    -- b `serial` c ...@) the resulting expression will have an @O(n)@
    -- complexity (instead O(n^2) for pair wise combinators described in the
    -- next section. These functions can be used efficiently with
    -- 'concatMapWith' et. al.  combinators that combine streams in a linear
    -- fashion (contrast with 'concatPairsWith' which combines streams as a
    -- binary tree).

      serial
    , ahead
    , async
    , wAsync
    , parallel
    , Par.parallelFst
    , Par.parallelMin

    -- * Binary Combinators (Pair Wise)
    -- | Like the functions in the section above these functions also combine
    -- two streams into a single stream but when used @n@ times linearly they
    -- exhibit O(n^2) complexity. They are best combined in a binary tree
    -- fashion using 'concatPairsWith' giving a @n * log n@ complexity.  Avoid
    -- using these with 'concatMapWith' when combining a large or infinite
    -- number of streams.

    -- ** Append
    , append

    -- ** wSerial
    -- | 'wSerial' is a CPS based stream interleaving functions. It can be
    -- used with 'concatMapWith' as well, however, the interleaving behavior of
    -- @n@ streams would be asymmetric giving exponentially more weightage to
    -- streams that come earlier in the composition.
    --
    , wSerial
    , Serial.wSerialFst
    , Serial.wSerialMin

    -- ** Interleave
    -- | 'interleave' is like 'wSerial'  but using a direct style
    -- implementation instead of CPS. It is faster than 'wSerial' due to stream
    -- fusion but has worse efficiency when used with 'concatMapWith' for large
    -- number of streams.
    , interleave
    , interleaveMin
    , interleaveSuffix
    , interleaveInfix

    -- ** Round Robin
    , roundrobin

    -- ** Zip
    , Z.zipWith
    , Z.zipWithM
    , Z.zipAsyncWith
    , Z.zipAsyncWithM

    -- ** Merge
    -- , merge
    , mergeBy
    , mergeByM
    , mergeAsyncBy
    , mergeAsyncByM

    -- * Combine Streams and Unfolds
    -- |
    -- Expand a stream by repeatedly using an unfold and merging the resulting
    -- streams.  Functions generally ending in the shape:
    --
    -- @Unfold m a b -> t m a -> t m b@

    -- ** Append Many (Unfold)
    -- | Unfold and flatten streams.
    , unfoldMany
    , unfoldManyInterleave
    , unfoldManyRoundRobin

    -- ** Interpose
    -- | Insert effects between streams. Like unfoldMany but intersperses an
    -- effect between the streams. A special case of gintercalate.
    , interpose
    , interposeSuffix
    -- , interposeBy

    -- ** Intercalate
    -- | Insert Streams between Streams.
    -- Like unfoldMany but intersperses streams from another source between
    -- the streams from the first source.
    , intercalate
    , intercalateSuffix
    , gintercalate
    , gintercalateSuffix

    -- * Append Many (concatMap)
    -- | Map and serially append streams. 'concatMapM' is a generalization of
    -- the binary append operation to append many streams.
    , concatMapM
    , concatMap
    , concatM
    , concat

    -- * Flatten Containers
    -- | Flatten 'Foldable' containers using the binary stream merging
    -- operations.
    , concatFoldableWith
    , concatMapFoldableWith
    , concatForFoldableWith

    -- * ConcatMapWith
    -- | Map and flatten a stream like 'concatMap' but using a custom binary
    -- stream merging combinator instead of just appending the streams.  The
    -- merging occurs sequentially, it works efficiently for 'serial', 'async',
    -- 'ahead' like merge operations where we consume one stream before the
    -- next or in case of 'wAsync' or 'parallel' where we consume all streams
    -- simultaneously anyway.
    --
    -- However, in cases where the merging consumes streams in a round robin
    -- fashion, a pair wise merging using 'concatPairsWith' would be more
    -- efficient. These cases include operations like 'mergeBy' or 'zipWith'.

    , concatMapWith
    , K.bindWith
    , concatSmapMWith

    -- * ConcatPairsWith
    -- | See the notes about suitable merge functions in the 'concatMapWith'
    -- section.
    , concatPairsWith

    -- * IterateMap
    -- | Map and flatten Trees of Streams
    , iterateMapWith
    , iterateSmapMWith
    , iterateMapLeftsWith

    -- * Deprecated
    , concatUnfold
    )
where

#include "inline.hs"

import Streamly.Internal.Data.Stream.Ahead (ahead)
import Streamly.Internal.Data.Stream.Async (async, wAsync)
import Streamly.Internal.Data.Stream.IsStream.Common
    (concatM, concatMapM, concatMap, smapM, fromPure, fromEffect)
import Streamly.Internal.Data.Stream.Parallel (parallel)
import Streamly.Internal.Data.Stream.Prelude
       ( concatFoldableWith, concatMapFoldableWith
       , concatForFoldableWith, fromStreamS, toStreamS)
import Streamly.Internal.Data.Stream.Serial (serial, wSerial)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Unfold.Type (Unfold)

import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import qualified Streamly.Internal.Data.Stream.Zip as Z

import Prelude hiding (concat, concatMap)

-- $setup
-- >>> :m
-- >>> import Data.IORef
-- >>> import Prelude hiding (zipWith, concatMap, concat)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Streamly.Internal.Data.Stream.IsStream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
-- >>> import qualified Streamly.Data.Array.Foreign as Array

-------------------------------------------------------------------------------
-- Appending
-------------------------------------------------------------------------------

-- XXX Reconcile the names "serial" and "append".
--
-- | Append the outputs of two streams, yielding all the elements from the
-- first stream and then yielding all the elements from the second stream.
--
-- IMPORTANT NOTE: This could be 100x faster than @serial/<>@ for appending a
-- few (say 100) streams because it can fuse via stream fusion. However, it
-- does not scale for a large number of streams (say 1000s) and becomes
-- qudartically slow. Therefore use this for custom appending of a few streams
-- but use 'concatMap' or 'concatMapWith serial' for appending @n@ streams or
-- infinite containers of streams.
--
-- /Pre-release/
{-# INLINE append #-}
append ::(IsStream t, Monad m) => t m b -> t m b -> t m b
append :: t m b -> t m b -> t m b
append t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.append (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

-------------------------------------------------------------------------------
-- Interleaving
-------------------------------------------------------------------------------

-- XXX Same as 'wSerial'. We should perhaps rename wSerial to interleave.
-- XXX Document the interleaving behavior of side effects in all the
-- interleaving combinators.
-- XXX Write time-domain equivalents of these. In the time domain we can
-- interleave two streams such that the value of second stream is always taken
-- from its last value even if no new value is being yielded, like
-- zipWithLatest. It would be something like interleaveWithLatest.
--
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. If any of the streams finishes
-- early the other stream continues alone until it too finishes.
--
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleave "ab" ",,,," :: Stream.SerialT Identity Char
-- fromList "a,b,,,"
--
-- >>> Stream.interleave "abcd" ",," :: Stream.SerialT Identity Char
-- fromList "a,b,cd"
--
-- 'interleave' is dual to 'interleaveMin', it can be called @interleaveMax@.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE interleave #-}
interleave ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleave :: t m b -> t m b -> t m b
interleave t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleave (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. As soon as the first stream
-- finishes, the output stops, discarding the remaining part of the second
-- stream. In this case, the last element in the resulting stream would be from
-- the second stream. If the second stream finishes early then the first stream
-- still continues to yield elements until it finishes.
--
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity Char
-- fromList "a,b,c,"
-- >>> Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity Char
-- fromList "a,bc"
--
-- 'interleaveSuffix' is a dual of 'interleaveInfix'.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE interleaveSuffix #-}
interleaveSuffix ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveSuffix :: t m b -> t m b -> t m b
interleaveSuffix t m b
m1 t m b
m2 =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleaveSuffix (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream and ending at the first stream.
-- If the second stream is longer than the first, elements from the second
-- stream are infixed with elements from the first stream. If the first stream
-- is longer then it continues yielding elements even after the second stream
-- has finished.
--
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity Char
-- fromList "a,b,c"
-- >>> Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity Char
-- fromList "a,bc"
--
-- 'interleaveInfix' is a dual of 'interleaveSuffix'.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE interleaveInfix #-}
interleaveInfix ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveInfix :: t m b -> t m b -> t m b
interleaveInfix t m b
m1 t m b
m2 =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleaveInfix (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. The output stops as soon as any
-- of the two streams finishes, discarding the remaining part of the other
-- stream. The last element of the resulting stream would be from the longer
-- stream.
--
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity Char
-- fromList "a,b,"
-- >>> Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity Char
-- fromList "a,b,c"
--
-- 'interleaveMin' is dual to 'interleave'.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE interleaveMin #-}
interleaveMin ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveMin :: t m b -> t m b -> t m b
interleaveMin t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleaveMin (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

-------------------------------------------------------------------------------
-- Scheduling
-------------------------------------------------------------------------------

-- | Schedule the execution of two streams in a fair round-robin manner,
-- executing each stream once, alternately. Execution of a stream may not
-- necessarily result in an output, a stream may chose to @Skip@ producing an
-- element until later giving the other stream a chance to run. Therefore, this
-- combinator fairly interleaves the execution of two streams rather than
-- fairly interleaving the output of the two streams. This can be useful in
-- co-operative multitasking without using explicit threads. This can be used
-- as an alternative to `async`.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE roundrobin #-}
roundrobin ::(IsStream t, Monad m) => t m b -> t m b -> t m b
roundrobin :: t m b -> t m b -> t m b
roundrobin t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.roundRobin (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

------------------------------------------------------------------------------
-- Merging (sorted streams)
------------------------------------------------------------------------------

-- | Merge two streams using a comparison function. The head elements of both
-- the streams are compared and the smaller of the two elements is emitted, if
-- both elements are equal then the element from the first stream is used
-- first.
--
-- If the streams are sorted in ascending order, the resulting stream would
-- also remain sorted in ascending order.
--
-- @
-- >>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])
-- [1,2,3,4,5,6,8]
--
-- @
--
-- @since 0.6.0
{-# INLINABLE mergeBy #-}
mergeBy ::
       (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeBy a -> a -> Ordering
f t m a
m1 t m a
m2 = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
S.mergeBy a -> a -> Ordering
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m2)

-- | Like 'mergeBy' but with a monadic comparison function.
--
-- Merge two streams randomly:
--
-- @
-- > randomly _ _ = randomIO >>= \x -> return $ if x then LT else GT
-- > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
-- [2,1,2,2,2,1,1,1]
-- @
--
-- Merge two streams in a proportion of 2:1:
--
-- @
-- >>> :{
-- do    
--  let proportionately m n = do
--       ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
--       return $ \_ _ -> do
--          r <- readIORef ref
--          writeIORef ref $ Prelude.tail r
--          return $ Prelude.head r  
--  f <- proportionately 2 1
--  xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2])
--  print xs
-- :}
-- [1,1,2,1,1,2,1,1,2]
--
-- @
--
-- @since 0.6.0
{-# INLINABLE mergeByM #-}
mergeByM
    :: (IsStream t, Monad m)
    => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeByM :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeByM a -> a -> m Ordering
f t m a
m1 t m a
m2 = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
S.mergeByM a -> a -> m Ordering
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m2)

{-
-- | Like 'mergeByM' but stops merging as soon as any of the two streams stops.
{-# INLINABLE mergeEndByAny #-}
mergeEndByAny
    :: (IsStream t, Monad m)
    => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeEndByAny f m1 m2 = fromStreamD $
    D.mergeEndByAny f (toStreamD m1) (toStreamD m2)

-- Like 'mergeByM' but stops merging as soon as the first stream stops.
{-# INLINABLE mergeEndByFirst #-}
mergeEndByFirst
    :: (IsStream t, Monad m)
    => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeEndByFirst f m1 m2 = fromStreamS $
    D.mergeEndByFirst f (toStreamD m1) (toStreamD m2)

-- Holding this back for now, we may want to use the name "merge" differently
-- | Same as @'mergeBy' 'compare'@.
--
-- @
-- >>> Stream.toList $ Stream.merge (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])
-- [1,2,3,4,5,6,8]
-- @
--
-- @since 0.6.0
{-# INLINABLE merge #-}
merge ::
       (IsStream t, Monad m, Ord a) => t m a -> t m a -> t m a
merge = mergeBy compare
-}

-- | Like 'mergeBy' but merges concurrently (i.e. both the elements being
-- merged are generated concurrently).
--
-- @since 0.6.0
{-# INLINE mergeAsyncBy #-}
mergeAsyncBy :: (IsStream t, MonadAsync m)
    => (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeAsyncBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeAsyncBy a -> a -> Ordering
f = (a -> a -> m Ordering) -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeAsyncByM (\a
a a
b -> Ordering -> m Ordering
forall (m :: * -> *) a. Monad m => a -> m a
return (Ordering -> m Ordering) -> Ordering -> m Ordering
forall a b. (a -> b) -> a -> b
$ a -> a -> Ordering
f a
a a
b)

-- | Like 'mergeByM' but merges concurrently (i.e. both the elements being
-- merged are generated concurrently).
--
-- @since 0.6.0
{-# INLINE mergeAsyncByM #-}
mergeAsyncByM :: (IsStream t, MonadAsync m)
    => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeAsyncByM :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeAsyncByM a -> a -> m Ordering
f t m a
m1 t m a
m2 =
    Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$
        let par :: t m a -> Stream m a
par = Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Par.mkParallelD (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
        in (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
D.mergeByM a -> a -> m Ordering
f (t m a -> Stream m a
forall a. t m a -> Stream m a
par t m a
m1) (t m a -> Stream m a
forall a. t m a -> Stream m a
par t m a
m2)


-- @since 0.7.0
{-# DEPRECATED concatUnfold "Please use unfoldMany instead." #-}
{-# INLINE concatUnfold #-}
concatUnfold ::(IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
concatUnfold :: Unfold m a b -> t m a -> t m b
concatUnfold Unfold m a b
u t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldMany Unfold m a b
u (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

------------------------------------------------------------------------------
-- Combine N Streams - unfoldMany
------------------------------------------------------------------------------

-- | Like 'concatMap' but uses an 'Unfold' for stream generation. Unlike
-- 'concatMap' this can fuse the 'Unfold' code with the inner loop and
-- therefore provide many times better performance.
--
-- @since 0.8.0
{-# INLINE unfoldMany #-}
unfoldMany ::(IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
unfoldMany :: Unfold m a b -> t m a -> t m b
unfoldMany Unfold m a b
u t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldMany Unfold m a b
u (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Like 'unfoldMany' but interleaves the streams in the same way as
-- 'interleave' behaves instead of appending them.
--
-- /Pre-release/
{-# INLINE unfoldManyInterleave #-}
unfoldManyInterleave ::(IsStream t, Monad m)
    => Unfold m a b -> t m a -> t m b
unfoldManyInterleave :: Unfold m a b -> t m a -> t m b
unfoldManyInterleave Unfold m a b
u t m a
m =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldManyInterleave Unfold m a b
u (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Like 'unfoldMany' but executes the streams in the same way as
-- 'roundrobin'.
--
-- /Pre-release/
{-# INLINE unfoldManyRoundRobin #-}
unfoldManyRoundRobin ::(IsStream t, Monad m)
    => Unfold m a b -> t m a -> t m b
unfoldManyRoundRobin :: Unfold m a b -> t m a -> t m b
unfoldManyRoundRobin Unfold m a b
u t m a
m =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldManyRoundRobin Unfold m a b
u (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

------------------------------------------------------------------------------
-- Combine N Streams - interpose
------------------------------------------------------------------------------

-- > interpose x unf str = gintercalate unf str UF.identity (repeat x)
--
-- | Unfold the elements of a stream, intersperse the given element between the
-- unfolded streams and then concat them into a single stream.
--
-- > unwords = S.interpose ' '
--
-- /Pre-release/
{-# INLINE interpose #-}
interpose :: (IsStream t, Monad m)
    => c -> Unfold m b c -> t m b -> t m c
interpose :: c -> Unfold m b c -> t m b -> t m c
interpose c
x Unfold m b c
unf t m b
str =
    Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ m c -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) c b.
Monad m =>
m c -> Unfold m b c -> Stream m b -> Stream m c
D.interpose (c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return c
x) Unfold m b c
unf (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str)

-- interposeSuffix x unf str = gintercalateSuffix unf str UF.identity (repeat x)
--
-- | Unfold the elements of a stream, append the given element after each
-- unfolded stream and then concat them into a single stream.
--
-- > unlines = S.interposeSuffix '\n'
--
-- /Pre-release/
{-# INLINE interposeSuffix #-}
interposeSuffix :: (IsStream t, Monad m)
    => c -> Unfold m b c -> t m b -> t m c
interposeSuffix :: c -> Unfold m b c -> t m b -> t m c
interposeSuffix c
x Unfold m b c
unf t m b
str =
    Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ m c -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) c b.
Monad m =>
m c -> Unfold m b c -> Stream m b -> Stream m c
D.interposeSuffix (c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return c
x) Unfold m b c
unf (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str)

------------------------------------------------------------------------------
-- Combine N Streams - intercalate
------------------------------------------------------------------------------

-- XXX we can swap the order of arguments to gintercalate so that the
-- definition of unfoldMany becomes simpler? The first stream should be
-- infixed inside the second one. However, if we change the order in
-- "interleave" as well similarly, then that will make it a bit unintuitive.
--
-- > unfoldMany unf str =
-- >     gintercalate unf str (UF.nilM (\_ -> return ())) (repeat ())
--
-- | 'interleaveInfix' followed by unfold and concat.
--
-- /Pre-release/
{-# INLINE gintercalate #-}
gintercalate
    :: (IsStream t, Monad m)
    => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalate :: Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalate Unfold m a c
unf1 t m a
str1 Unfold m b c
unf2 t m b
str2 =
    Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a c b.
Monad m =>
Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
D.gintercalate
        Unfold m a c
unf1 (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str1)
        Unfold m b c
unf2 (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str2)

-- > intercalate unf seed str = gintercalate unf str unf (repeatM seed)
--
-- | 'intersperse' followed by unfold and concat.
--
-- > intercalate unf a str = unfoldMany unf $ intersperse a str
-- > intersperse = intercalate (Unfold.function id)
-- > unwords = intercalate Unfold.fromList " "
--
-- >>> Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"]
-- "abc def ghi"
--
-- @since 0.8.0
{-# INLINE intercalate #-}
intercalate :: (IsStream t, Monad m)
    => Unfold m b c -> b -> t m b -> t m c
intercalate :: Unfold m b c -> b -> t m b -> t m c
intercalate Unfold m b c
unf b
seed t m b
str = Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$
    Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldMany Unfold m b c
unf (Stream m b -> Stream m c) -> Stream m b -> Stream m c
forall a b. (a -> b) -> a -> b
$ b -> Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
D.intersperse b
seed (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
str)

-- | 'interleaveSuffix' followed by unfold and concat.
--
-- /Pre-release/
{-# INLINE gintercalateSuffix #-}
gintercalateSuffix
    :: (IsStream t, Monad m)
    => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalateSuffix :: Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalateSuffix Unfold m a c
unf1 t m a
str1 Unfold m b c
unf2 t m b
str2 =
    Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a c b.
Monad m =>
Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
D.gintercalateSuffix
        Unfold m a c
unf1 (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str1)
        Unfold m b c
unf2 (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str2)

-- > intercalateSuffix unf seed str = gintercalateSuffix unf str unf (repeatM seed)
--
-- | 'intersperseSuffix' followed by unfold and concat.
--
-- > intercalateSuffix unf a str = unfoldMany unf $ intersperseSuffix a str
-- > intersperseSuffix = intercalateSuffix (Unfold.function id)
-- > unlines = intercalateSuffix Unfold.fromList "\n"
--
-- >>> Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]
-- "abc\ndef\nghi\n"
--
-- @since 0.8.0
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (IsStream t, Monad m)
    => Unfold m b c -> b -> t m b -> t m c
intercalateSuffix :: Unfold m b c -> b -> t m b -> t m c
intercalateSuffix Unfold m b c
unf b
seed t m b
str = Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldMany Unfold m b c
unf
    (Stream m b -> Stream m c) -> Stream m b -> Stream m c
forall a b. (a -> b) -> a -> b
$ m b -> Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseSuffix (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
seed) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str)

{-
{-# INLINE iterateUnfold #-}
iterateUnfold :: (IsStream t, MonadAsync m)
    => Unfold m a a -> t m a -> t m a
iterateUnfold unf xs = undefined
-}

------------------------------------------------------------------------------
-- Combine N Streams - concatMap
------------------------------------------------------------------------------

-- | Flatten a stream of streams to a single stream.
--
-- @
-- concat = concatMap id
-- @
--
-- /Pre-release/
{-# INLINE concat #-}
concat :: (IsStream t, Monad m) => t m (t m a) -> t m a
concat :: t m (t m a) -> t m a
concat = (t m a -> t m a) -> t m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap t m a -> t m a
forall a. a -> a
id

------------------------------------------------------------------------------
-- Combine N Streams - concatMap
------------------------------------------------------------------------------

-- | @concatMapWith mixer generator stream@ is a two dimensional looping
-- combinator.  The @generator@ function is used to generate streams from the
-- elements in the input @stream@ and the @mixer@ function is used to merge
-- those streams.
--
-- Note we can merge streams concurrently by using a concurrent merge function.
--
-- /Since: 0.7.0/
--
-- /Since: 0.8.0 (signature change)/
{-# INLINE concatMapWith #-}
concatMapWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> (a -> t m b)
    -> t m a
    -> t m b
concatMapWith :: (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith = (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
K.concatMapBy

-- | Like 'concatMapWith' but carries a state which can be used to share
-- information across multiple steps of concat.
--
-- @
-- concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial
-- @
--
-- /Pre-release/
--
{-# INLINE concatSmapMWith #-}
concatSmapMWith
    :: (IsStream t, Monad m)
    => (t m b -> t m b -> t m b)
    -> (s -> a -> m (s, t m b))
    -> m s
    -> t m a
    -> t m b
concatSmapMWith :: (t m b -> t m b -> t m b)
-> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b
concatSmapMWith t m b -> t m b -> t m b
combine s -> a -> m (s, t m b)
f m s
initial = (t m b -> t m b -> t m b)
-> (t m b -> t m b) -> t m (t m b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith t m b -> t m b -> t m b
combine t m b -> t m b
forall a. a -> a
id (t m (t m b) -> t m b) -> (t m a -> t m (t m b)) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (s -> a -> m (s, t m b)) -> m s -> t m a -> t m (t m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) s a b.
(IsStream t, Monad m) =>
(s -> a -> m (s, b)) -> m s -> t m a -> t m b
smapM s -> a -> m (s, t m b)
f m s
initial

-- Keep concating either streams as long as rights are generated, stop as soon
-- as a left is generated and concat the left stream.
--
-- See also: 'handle'
--
-- /Unimplemented/
--
{-
concatMapEitherWith
    :: -- (IsStream t, MonadAsync m) =>
       (forall x. t m x -> t m x -> t m x)
    -> (a -> t m (Either (t m b) b))
    -> t m a
    -> t m b
concatMapEitherWith = undefined
-}

-- XXX Implement a StreamD version for fusion.
--
-- | Combine streams in pairs using a binary stream combinator, then combine
-- the resulting streams in pairs recursively until we get to a single combined
-- stream.
--
-- For example, you can sort a stream using merge sort like this:
--
-- >>> Stream.toList $ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure $ Stream.fromList [5,1,7,9,2]
-- [1,2,5,7,9]
--
-- /Caution: the stream of streams must be finite/
--
-- /Pre-release/
--
{-# INLINE concatPairsWith #-}
concatPairsWith :: IsStream t =>
       (t m b -> t m b -> t m b)
    -> (a -> t m b)
    -> t m a
    -> t m b
concatPairsWith :: (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatPairsWith = (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
K.concatPairsWith

------------------------------------------------------------------------------
-- IterateMap - Map and flatten Trees of Streams
------------------------------------------------------------------------------

-- | Like 'iterateM' but iterates after mapping a stream generator on the
-- output.
--
-- Yield an input element in the output stream, map a stream generator on it
-- and then do the same on the resulting stream. This can be used for a depth
-- first traversal of a tree like structure.
--
-- Note that 'iterateM' is a special case of 'iterateMapWith':
--
-- @
-- iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect
-- @
--
-- It can be used to traverse a tree structure.  For example, to list a
-- directory tree:
--
-- @
-- Stream.iterateMapWith Stream.serial
--     (either Dir.toEither (const nil))
--     (fromPure (Left "tmp"))
-- @
--
-- /Pre-release/
--
{-# INLINE iterateMapWith #-}
iterateMapWith
    :: IsStream t
    => (t m a -> t m a -> t m a)
    -> (a -> t m a)
    -> t m a
    -> t m a
iterateMapWith :: (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
iterateMapWith t m a -> t m a -> t m a
combine a -> t m a
f = (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith t m a -> t m a -> t m a
combine a -> t m a
go
    where
    go :: a -> t m a
go a
x = a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure a
x t m a -> t m a -> t m a
`combine` (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith t m a -> t m a -> t m a
combine a -> t m a
go (a -> t m a
f a
x)

{-
{-# INLINE iterateUnfold #-}
iterateUnfold :: (IsStream t, MonadAsync m)
    => Unfold m a a -> t m a -> t m a
iterateUnfold unf xs = undefined
-}

------------------------------------------------------------------------------
-- Flattening Graphs
------------------------------------------------------------------------------

-- To traverse graphs we need a state to be carried around in the traversal.
-- For example, we can use a hashmap to store the visited status of nodes.

-- | Like 'iterateMap' but carries a state in the stream generation function.
-- This can be used to traverse graph like structures, we can remember the
-- visited nodes in the state to avoid cycles.
--
-- Note that a combination of 'iterateMap' and 'usingState' can also be used to
-- traverse graphs. However, this function provides a more localized state
-- instead of using a global state.
--
-- See also: 'mfix'
--
-- /Pre-release/
--
{-# INLINE iterateSmapMWith #-}
iterateSmapMWith
    :: (IsStream t, Monad m)
    => (t m a -> t m a -> t m a)
    -> (b -> a -> m (b, t m a))
    -> m b
    -> t m a
    -> t m a
iterateSmapMWith :: (t m a -> t m a -> t m a)
-> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a
iterateSmapMWith t m a -> t m a -> t m a
combine b -> a -> m (b, t m a)
f m b
initial t m a
stream =
    (b -> t m a) -> t m b -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap (\b
b -> (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith t m a -> t m a -> t m a
combine (b -> a -> t m a
go b
b) t m a
stream) (m b -> t m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect m b
initial)

    where

    go :: b -> a -> t m a
go b
b a
a = a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure a
a t m a -> t m a -> t m a
`combine` b -> a -> t m a
feedback b
b a
a

    feedback :: b -> a -> t m a
feedback b
b a
a =
        ((b, t m a) -> t m a) -> t m (b, t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap
            (\(b
b1, t m a
s) -> (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith t m a -> t m a -> t m a
combine (b -> a -> t m a
go b
b1) t m a
s)
            (m (b, t m a) -> t m (b, t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (m (b, t m a) -> t m (b, t m a)) -> m (b, t m a) -> t m (b, t m a)
forall a b. (a -> b) -> a -> b
$ b -> a -> m (b, t m a)
f b
b a
a)

------------------------------------------------------------------------------
-- iterateMap - Either streams
------------------------------------------------------------------------------

-- | In an 'Either' stream iterate on 'Left's.  This is a special case of
-- 'iterateMapWith':
--
-- @
-- iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))
-- @
--
-- To traverse a directory tree:
--
-- @
-- iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))
-- @
--
-- /Pre-release/
--
{-# INLINE iterateMapLeftsWith #-}
iterateMapLeftsWith
    :: IsStream t
    => (t m (Either a b) -> t m (Either a b) -> t m (Either a b))
    -> (a -> t m (Either a b))
    -> t m (Either a b)
    -> t m (Either a b)
iterateMapLeftsWith :: (t m (Either a b) -> t m (Either a b) -> t m (Either a b))
-> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b)
iterateMapLeftsWith t m (Either a b) -> t m (Either a b) -> t m (Either a b)
combine a -> t m (Either a b)
f = (t m (Either a b) -> t m (Either a b) -> t m (Either a b))
-> (Either a b -> t m (Either a b))
-> t m (Either a b)
-> t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
iterateMapWith t m (Either a b) -> t m (Either a b) -> t m (Either a b)
combine ((a -> t m (Either a b))
-> (b -> t m (Either a b)) -> Either a b -> t m (Either a b)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> t m (Either a b)
f (t m (Either a b) -> b -> t m (Either a b)
forall a b. a -> b -> a
const t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil))