{-# LANGUAGE CPP #-}
{-# LANGUAGE UndecidableInstances #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Type
-- Copyright   : (c) 2018 Composewell Technologies
--               (c) Roman Leshchinskiy 2008-2010
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

-- The stream type is inspired by the vector package.  A few functions in this
-- module have been originally adapted from the vector package (c) Roman
-- Leshchinskiy. See the notes in specific functions.

module Streamly.Internal.Data.Stream.StreamD.Type
    (
    -- * The stream type
      Step (..)
    -- XXX UnStream is exported to avoid a performance issue in some
    -- combinators if we use the pattern synonym "Stream".
    , Stream (Stream, UnStream)

    -- * CrossStream type wrapper
    , CrossStream
    , unCross
    , mkCross

    -- * Conversion to StreamK
    , fromStreamK
    , toStreamK

    -- * From Unfold
    , unfold

    -- * Construction
    -- ** Primitives
    , nilM
    , consM

    -- ** From Values
    , fromPure
    , fromEffect

    -- ** From Containers
    , Streamly.Internal.Data.Stream.StreamD.Type.fromList

    -- * Elimination
    -- ** Primitives
    , uncons

    -- ** Strict Left Folds
    , Streamly.Internal.Data.Stream.StreamD.Type.fold
    , foldBreak
    , foldAddLazy
    , foldAdd
    , foldEither

    , Streamly.Internal.Data.Stream.StreamD.Type.foldl'
    , foldlM'
    , foldlx'
    , foldlMx'

    -- ** Lazy Right Folds
    , foldrM
    , foldrMx
    , Streamly.Internal.Data.Stream.StreamD.Type.foldr
    , foldrS

    -- ** Specific Folds
    , drain
    , Streamly.Internal.Data.Stream.StreamD.Type.toList

    -- * Mapping
    , map
    , mapM

    -- * Stateful Filters
    , take
    , takeWhile
    , takeWhileM
    , takeEndBy
    , takeEndByM

    -- * Combining Two Streams
    -- ** Zipping
    , zipWithM
    , zipWith

    -- ** Cross Product
    , crossApply
    , crossApplyFst
    , crossApplySnd
    , crossWith
    , cross

    -- * Unfold Many
    , ConcatMapUState (..)
    , unfoldMany

    -- * Concat
    , concatEffect
    , concatMap
    , concatMapM
    , concat

    -- * Unfold Iterate
    , unfoldIterateDfs
    , unfoldIterateBfs
    , unfoldIterateBfsRev

    -- * Concat Iterate
    , concatIterateScan
    , concatIterateDfs
    , concatIterateBfs
    , concatIterateBfsRev

    -- * Fold Many
    , FoldMany (..) -- for inspection testing
    , FoldManyPost (..)
    , foldMany
    , foldManyPost
    , groupsOf
    , refoldMany

    -- * Fold Iterate
    , reduceIterateBfs
    , foldIterateBfs

    -- * Multi-stream folds
    , eqBy
    , cmpBy
    )
where

#include "inline.hs"

import Control.Applicative (liftA2)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.IO.Class (MonadIO(..))
import Data.Foldable (Foldable(foldl'), fold, foldr)
import Data.Functor (($>))
import Data.Functor.Identity (Identity(..))
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
import Fusion.Plugin.Types (Fuse(..))
import GHC.Base (build)
import GHC.Exts (IsList(..), IsString(..), oneShot)
import GHC.Types (SPEC(..))
import Prelude hiding (map, mapM, take, concatMap, takeWhile, zipWith, concat)
import Text.Read
       ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
       , readListPrecDefault)

import Streamly.Internal.BaseCompat ((#.))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Refold.Type (Refold(..))
import Streamly.Internal.Data.Stream.StreamD.Step (Step (..))
import Streamly.Internal.Data.SVar.Type (State, adaptState, defState)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))

import qualified Streamly.Internal.Data.Fold.Type as FL hiding (foldr)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
#ifdef USE_UNFOLDS_EVERYWHERE
import qualified Streamly.Internal.Data.Unfold.Type as Unfold
#endif

#include "DocTestDataStream.hs"

------------------------------------------------------------------------------
-- The direct style stream type
------------------------------------------------------------------------------

-- gst = global state

-- | A stream consists of a step function that generates the next step given a
-- current state, and the current state.
data Stream m a =
    forall s. UnStream (State K.StreamK m a -> s -> m (Step s a)) s

-- XXX This causes perf trouble when pattern matching with "Stream"  in a
-- recursive way, e.g. in uncons, foldBreak, concatMap. We need to get rid of
-- this.
unShare :: Stream m a -> Stream m a
unShare :: forall (m :: * -> *) a. Stream m a -> Stream m a
unShare (UnStream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream forall {m :: * -> *} {a}. State StreamK m a -> s -> m (Step s a)
step' s
state
    where step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst = State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst)

pattern Stream :: (State K.StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
pattern $bStream :: forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
$mStream :: forall {r} {m :: * -> *} {a}.
Stream m a
-> (forall {s}. (State StreamK m a -> s -> m (Step s a)) -> s -> r)
-> ((# #) -> r)
-> r
Stream step state <- (unShare -> UnStream step state)
    where Stream = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream

{-# COMPLETE Stream #-}

------------------------------------------------------------------------------
-- Primitives
------------------------------------------------------------------------------

-- | A stream that terminates without producing any output, but produces a side
-- effect.
--
-- >>> Stream.fold Fold.toList (Stream.nilM (print "nil"))
-- "nil"
-- []
--
-- /Pre-release/
{-# INLINE_NORMAL nilM #-}
nilM :: Applicative m => m b -> Stream m a
nilM :: forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
nilM m b
m = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream (\State StreamK m a
_ ()
_ -> m b
m forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> forall s a. Step s a
Stop) ()

-- | Like 'cons' but fuses an effect instead of a pure value.
{-# INLINE_NORMAL consM #-}
consM :: Applicative m => m a -> Stream m a -> Stream m a
consM :: forall (m :: * -> *) a.
Applicative m =>
m a -> Stream m a -> Stream m a
consM m a
m (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step1 forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step1 #-}
    step1 :: State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step1 State StreamK m a
_ Maybe s
Nothing = (forall s a. a -> s -> Step s a
`Yield` forall a. a -> Maybe a
Just s
state) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m a
m
    step1 State StreamK m a
gst (Just s
st) = do
          (\case
            Yield a
a s
s -> forall s a. a -> s -> Step s a
Yield a
a (forall a. a -> Maybe a
Just s
s)
            Skip  s
s   -> forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just s
s)
            Step s a
Stop      -> forall s a. Step s a
Stop) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st

-- | Decompose a stream into its head and tail. If the stream is empty, returns
-- 'Nothing'. If the stream is non-empty, returns @Just (a, ma)@, where @a@ is
-- the head of the stream and @ma@ its tail.
--
-- Properties:
--
-- >>> Nothing <- Stream.uncons Stream.nil
-- >>> Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)
--
-- This can be used to consume the stream in an imperative manner one element
-- at a time, as it just breaks down the stream into individual elements and we
-- can loop over them as we deem fit. For example, this can be used to convert
-- a streamly stream into other stream types.
--
-- All the folds in this module can be expressed in terms of 'uncons', however,
-- this is generally less efficient than specific folds because it takes apart
-- the stream one element at a time, therefore, does not take adavantage of
-- stream fusion.
--
-- 'foldBreak' is a more general way of consuming a stream piecemeal.
--
-- >>> :{
-- uncons xs = do
--     r <- Stream.foldBreak Fold.one xs
--     return $ case r of
--         (Nothing, _) -> Nothing
--         (Just h, t) -> Just (h, t)
-- :}
--
{-# INLINE_NORMAL uncons #-}
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
uncons :: forall (m :: * -> *) a.
Monad m =>
Stream m a -> m (Maybe (a, Stream m a))
uncons (UnStream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> m (Maybe (a, Stream m a))
go SPEC
SPEC s
state
  where
    go :: SPEC -> s -> m (Maybe (a, Stream m a))
go !SPEC
_ s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (a
x, forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step s
s)
            Skip  s
s   -> SPEC -> s -> m (Maybe (a, Stream m a))
go SPEC
SPEC s
s
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

------------------------------------------------------------------------------
-- From 'Unfold'
------------------------------------------------------------------------------

data UnfoldState s = UnfoldNothing | UnfoldJust s

-- | Convert an 'Unfold' into a stream by supplying it an input seed.
--
-- >>> s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")
-- >>> Stream.fold Fold.drain s
-- hello
-- hello
-- hello
--
{-# INLINE_NORMAL unfold #-}
unfold :: Applicative m => Unfold m a b -> a -> Stream m b
unfold :: forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
unfold (Unfold s -> m (Step s b)
ustep a -> m s
inject) a
seed = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {p}. p -> UnfoldState s -> m (Step (UnfoldState s) b)
step forall s. UnfoldState s
UnfoldNothing

    where

    {-# INLINE_LATE step #-}
    step :: p -> UnfoldState s -> m (Step (UnfoldState s) b)
step p
_ UnfoldState s
UnfoldNothing = forall s a. s -> Step s a
Skip forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s. s -> UnfoldState s
UnfoldJust forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m s
inject a
seed
    step p
_ (UnfoldJust s
st) = do
        (\case
            Yield b
x s
s -> forall s a. a -> s -> Step s a
Yield b
x (forall s. s -> UnfoldState s
UnfoldJust s
s)
            Skip s
s    -> forall s a. s -> Step s a
Skip (forall s. s -> UnfoldState s
UnfoldJust s
s)
            Step s b
Stop      -> forall s a. Step s a
Stop) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m (Step s b)
ustep s
st

------------------------------------------------------------------------------
-- From Values
------------------------------------------------------------------------------

-- | Create a singleton stream from a pure value.
--
-- >>> fromPure a = a `Stream.cons` Stream.nil
-- >>> fromPure = pure
-- >>> fromPure = Stream.fromEffect . pure
--
{-# INLINE_NORMAL fromPure #-}
fromPure :: Applicative m => a -> Stream m a
fromPure :: forall (m :: * -> *) a. Applicative m => a -> Stream m a
fromPure a
x = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream (\State StreamK m a
_ Bool
s -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall {p}. p -> Bool -> Step Bool a
step forall a. HasCallStack => a
undefined Bool
s) Bool
True
  where
    {-# INLINE_LATE step #-}
    step :: p -> Bool -> Step Bool a
step p
_ Bool
True  = forall s a. a -> s -> Step s a
Yield a
x Bool
False
    step p
_ Bool
False = forall s a. Step s a
Stop

-- | Create a singleton stream from a monadic action.
--
-- >>> fromEffect m = m `Stream.consM` Stream.nil
-- >>> fromEffect = Stream.sequence . Stream.fromPure
--
-- >>> Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")
-- hello
--
{-# INLINE_NORMAL fromEffect #-}
fromEffect :: Applicative m => m a -> Stream m a
fromEffect :: forall (m :: * -> *) a. Applicative m => m a -> Stream m a
fromEffect m a
m = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {p}. p -> Bool -> m (Step Bool a)
step Bool
True

    where

    {-# INLINE_LATE step #-}
    step :: p -> Bool -> m (Step Bool a)
step p
_ Bool
True  = (forall s a. a -> s -> Step s a
`Yield` Bool
False) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m a
m
    step p
_ Bool
False = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- From Containers
------------------------------------------------------------------------------

-- Adapted from the vector package.

-- | Construct a stream from a list of pure values.
{-# INLINE_LATE fromList #-}
fromList :: Applicative m => [a] -> Stream m a
#ifdef USE_UNFOLDS_EVERYWHERE
fromList = unfold Unfold.fromList
#else
fromList :: forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
fromList = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {f :: * -> *} {p} {a}.
Applicative f =>
p -> [a] -> f (Step [a] a)
step
  where
    {-# INLINE_LATE step #-}
    step :: p -> [a] -> f (Step [a] a)
step p
_ (a
x:[a]
xs) = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x [a]
xs
    step p
_ []     = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall s a. Step s a
Stop
#endif

------------------------------------------------------------------------------
-- Conversions From/To
------------------------------------------------------------------------------

-- | Convert a CPS encoded StreamK to direct style step encoded StreamD
{-# INLINE_LATE fromStreamK #-}
fromStreamK :: Applicative m => K.StreamK m a -> Stream m a
fromStreamK :: forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
fromStreamK = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
Applicative m =>
State StreamK m a -> StreamK m a -> m (Step (StreamK m a) a)
step
    where
    step :: State StreamK m a -> StreamK m a -> m (Step (StreamK m a) a)
step State StreamK m a
gst StreamK m a
m1 =
        let stop :: m (Step s a)
stop       = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall s a. Step s a
Stop
            single :: a -> f (Step (StreamK m a) a)
single a
a   = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a forall (m :: * -> *) a. StreamK m a
K.nil
            yieldk :: a -> s -> f (Step s a)
yieldk a
a s
r = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a s
r
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
gst forall {f :: * -> *} {a} {s}.
Applicative f =>
a -> s -> f (Step s a)
yieldk forall {f :: * -> *} {a} {m :: * -> *} {a}.
Applicative f =>
a -> f (Step (StreamK m a) a)
single forall {s} {a}. m (Step s a)
stop StreamK m a
m1

-- | Convert a direct style step encoded StreamD to a CPS encoded StreamK
{-# INLINE_LATE toStreamK #-}
toStreamK :: Monad m => Stream m a -> K.StreamK m a
toStreamK :: forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
toStreamK (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = s -> StreamK m a
go s
state
    where
    go :: s -> StreamK m a
go s
st = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
gst a -> StreamK m a -> m r
yld a -> m r
_ m r
stp ->
      let go' :: s -> m r
go' s
ss = do
           Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
ss
           case Step s a
r of
               Yield a
x s
s -> a -> StreamK m a -> m r
yld a
x (s -> StreamK m a
go s
s)
               Skip  s
s   -> s -> m r
go' s
s
               Step s a
Stop      -> m r
stp
      in s -> m r
go' s
st

#ifndef DISABLE_FUSION
{-# RULES "fromStreamK/toStreamK fusion"
    forall s. toStreamK (fromStreamK s) = s #-}
{-# RULES "toStreamK/fromStreamK fusion"
    forall s. fromStreamK (toStreamK s) = s #-}
#endif

------------------------------------------------------------------------------
-- Running a 'Fold'
------------------------------------------------------------------------------

-- >>> fold f = Fold.extractM . Stream.foldAddLazy f
-- >>> fold f = Stream.fold Fold.one . Stream.foldManyPost f
-- >>> fold f = Fold.extractM <=< Stream.foldAdd f

-- | Fold a stream using the supplied left 'Fold' and reducing the resulting
-- expression strictly at each step. The behavior is similar to 'foldl''. A
-- 'Fold' can terminate early without consuming the full stream. See the
-- documentation of individual 'Fold's for termination behavior.
--
-- Definitions:
--
-- >>> fold f = fmap fst . Stream.foldBreak f
-- >>> fold f = Stream.parse (Parser.fromFold f)
--
-- Example:
--
-- >>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
-- 5050
--
{-# INLINE_NORMAL fold #-}
fold :: Monad m => Fold m a b -> Stream m a -> m b
fold :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
fold Fold m a b
fld Stream m a
strm = do
    (b
b, Stream m a
_) <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (b, Stream m a)
foldBreak Fold m a b
fld Stream m a
strm
    forall (m :: * -> *) a. Monad m => a -> m a
return b
b

-- | Fold resulting in either breaking the stream or continuation of the fold.
-- Instead of supplying the input stream in one go we can run the fold multiple
-- times, each time supplying the next segment of the input stream. If the fold
-- has not yet finished it returns a fold that can be run again otherwise it
-- returns the fold result and the residual stream.
--
-- /Internal/
{-# INLINE_NORMAL foldEither #-}
foldEither :: Monad m =>
    Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a))
foldEither :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a))
foldEither (Fold s -> a -> m (Step s b)
fstep m (Step s b)
begin s -> m b
done) (UnStream State StreamK m a -> s -> m (Step s a)
step s
state) = do
    Step s b
res <- m (Step s b)
begin
    case Step s b
res of
        FL.Partial s
fs -> SPEC -> s -> s -> m (Either (Fold m a b) (b, Stream m a))
go SPEC
SPEC s
fs s
state
        FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! forall a b. b -> Either a b
Right (b
fb, forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step s
state)

    where

    {-# INLINE go #-}
    go :: SPEC -> s -> s -> m (Either (Fold m a b) (b, Stream m a))
go !SPEC
_ !s
fs s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
res of
                    FL.Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! forall a b. b -> Either a b
Right (b
b, forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step s
s)
                    FL.Partial s
fs1 -> SPEC -> s -> s -> m (Either (Fold m a b) (b, Stream m a))
go SPEC
SPEC s
fs1 s
s
            Skip s
s -> SPEC -> s -> s -> m (Either (Fold m a b) (b, Stream m a))
go SPEC
SPEC s
fs s
s
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! forall a b. a -> Either a b
Left (forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
fstep (forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial s
fs) s -> m b
done)

-- | Like 'fold' but also returns the remaining stream. The resulting stream
-- would be 'Stream.nil' if the stream finished before the fold.
--
{-# INLINE_NORMAL foldBreak #-}
foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
foldBreak :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (b, Stream m a)
foldBreak Fold m a b
fld Stream m a
strm = do
    Either (Fold m a b) (b, Stream m a)
r <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a))
foldEither Fold m a b
fld Stream m a
strm
    case Either (Fold m a b) (b, Stream m a)
r of
        Right (b, Stream m a)
res -> forall (m :: * -> *) a. Monad m => a -> m a
return (b, Stream m a)
res
        Left (Fold s -> a -> m (Step s b)
_ m (Step s b)
initial s -> m b
extract) -> do
            Step s b
res <- m (Step s b)
initial
            case Step s b
res of
                FL.Done b
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"foldBreak: unreachable state"
                FL.Partial s
s -> do
                    b
b <- s -> m b
extract s
s
                    forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall {a}. Stream m a
nil)

    where

    nil :: Stream m a
nil = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream (\State StreamK m a
_ ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop) ()

-- | Append a stream to a fold lazily to build an accumulator incrementally.
--
-- Example, to continue folding a list of streams on the same sum fold:
--
-- >>> streams = [Stream.fromList [1..5], Stream.fromList [6..10]]
-- >>> f = Prelude.foldl Stream.foldAddLazy Fold.sum streams
-- >>> Stream.fold f Stream.nil
-- 55
--
{-# INLINE_NORMAL foldAddLazy #-}
foldAddLazy :: Monad m => Fold m a b -> Stream m a -> Fold m a b
foldAddLazy :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Fold m a b
foldAddLazy (Fold s -> a -> m (Step s b)
fstep m (Step s b)
finitial s -> m b
fextract) (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
fextract

    where

    initial :: m (Step s b)
initial = do
        Step s b
res <- m (Step s b)
finitial
        case Step s b
res of
            FL.Partial s
fs -> SPEC -> s -> s -> m (Step s b)
go SPEC
SPEC s
fs s
state
            FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done b
fb

    {-# INLINE go #-}
    go :: SPEC -> s -> s -> m (Step s b)
go !SPEC
_ !s
fs s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
sstep forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
res of
                    FL.Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done b
b
                    FL.Partial s
fs1 -> SPEC -> s -> s -> m (Step s b)
go SPEC
SPEC s
fs1 s
s
            Skip s
s -> SPEC -> s -> s -> m (Step s b)
go SPEC
SPEC s
fs s
s
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial s
fs

-- >>> foldAdd f = Stream.foldAddLazy f >=> Fold.reduce

-- |
-- >>> foldAdd = flip Fold.addStream
--
foldAdd :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b)
foldAdd :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (Fold m a b)
foldAdd Fold m a b
f =
    forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Streamly.Internal.Data.Stream.StreamD.Type.fold (forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m a (Fold m a b)
FL.duplicate Fold m a b
f)

------------------------------------------------------------------------------
-- Right Folds
------------------------------------------------------------------------------

-- Adapted from the vector package.
--
-- XXX Use of SPEC constructor in folds causes 2x performance degradation in
-- one shot operations, but helps immensely in operations composed of multiple
-- combinators or the same combinator many times. There seems to be an
-- opportunity to optimize here, can we get both, better perf for single ops
-- as well as composed ops? Without SPEC, all single operation benchmarks
-- become 2x faster.

-- The way we want a left fold to be strict, dually we want the right fold to
-- be lazy.  The correct signature of the fold function to keep it lazy must be
-- (a -> m b -> m b) instead of (a -> b -> m b). We were using the latter
-- earlier, which is incorrect. In the latter signature we have to feed the
-- value to the fold function after evaluating the monadic action, depending on
-- the bind behavior of the monad, the action may get evaluated immediately
-- introducing unnecessary strictness to the fold. If the implementation is
-- lazy the following example, must work:
--
-- S.foldrM (\x t -> if x then return t else return False) (return True)
--  (S.fromList [False,undefined] :: Stream IO Bool)

-- | Right associative/lazy pull fold. @foldrM build final stream@ constructs
-- an output structure using the step function @build@. @build@ is invoked with
-- the next input element and the remaining (lazy) tail of the output
-- structure. It builds a lazy output expression using the two. When the "tail
-- structure" in the output expression is evaluated it calls @build@ again thus
-- lazily consuming the input @stream@ until either the output expression built
-- by @build@ is free of the "tail" or the input is exhausted in which case
-- @final@ is used as the terminating case for the output structure. For more
-- details see the description in the previous section.
--
-- Example, determine if any element is 'odd' in a stream:
--
-- >>> s = Stream.fromList (2:4:5:undefined)
-- >>> step x xs = if odd x then return True else xs
-- >>> Stream.foldrM step (return False) s
-- True
--
{-# INLINE_NORMAL foldrM #-}
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM a -> m b -> m b
f m b
z (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> m b
go !SPEC
_ s
st = do
          Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> m b -> m b
f a
x (SPEC -> s -> m b
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> m b
go SPEC
SPEC s
s
            Step s a
Stop      -> m b
z

{-# INLINE_NORMAL foldrMx #-}
foldrMx :: Monad m
    => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
foldrMx :: forall (m :: * -> *) a x b.
Monad m =>
(a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
foldrMx a -> m x -> m x
fstep m x
final m x -> m b
convert (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = m x -> m b
convert forall a b. (a -> b) -> a -> b
$ SPEC -> s -> m x
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> m x
go !SPEC
_ s
st = do
          Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> m x -> m x
fstep a
x (SPEC -> s -> m x
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> m x
go SPEC
SPEC s
s
            Step s a
Stop      -> m x
final

-- XXX Should we make all argument strict wherever we use SPEC?

-- Note that foldr works on pure values, therefore it becomes necessarily
-- strict when the monad m is strict. In that case it cannot terminate early,
-- it would evaluate all of its input.  Though, this should work fine with lazy
-- monads. For example, if "any" is implemented using "foldr" instead of
-- "foldrM" it performs the same with Identity monad but performs 1000x slower
-- with IO monad.

-- | Right fold, lazy for lazy monads and pure streams, and strict for strict
-- monads.
--
-- Please avoid using this routine in strict monads like IO unless you need a
-- strict right fold. This is provided only for use in lazy monads (e.g.
-- Identity) or pure streams. Note that with this signature it is not possible
-- to implement a lazy foldr when the monad @m@ is strict. In that case it
-- would be strict in its accumulator and therefore would necessarily consume
-- all its input.
--
-- >>> foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)
--
-- Note: This is similar to Fold.foldr' (the right fold via left fold), but
-- could be more efficient.
--
{-# INLINE_NORMAL foldr #-}
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
foldr :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
foldr a -> b -> b
f b
z = forall (m :: * -> *) a b.
Monad m =>
(a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM (forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> b -> b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> m a
return) (forall (m :: * -> *) a. Monad m => a -> m a
return b
z)

-- this performs horribly, should not be used
{-# INLINE_NORMAL foldrS #-}
foldrS
    :: Monad m
    => (a -> Stream m b -> Stream m b)
    -> Stream m b
    -> Stream m a
    -> Stream m b
foldrS :: forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
foldrS a -> Stream m b -> Stream m b
f Stream m b
final (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> Stream m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> Stream m b
go !SPEC
_ s
st = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
concatEffect forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Step s a -> Stream m b
g forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st

    g :: Step s a -> Stream m b
g Step s a
r =
        case Step s a
r of
          Yield a
x s
s -> a -> Stream m b -> Stream m b
f a
x (SPEC -> s -> Stream m b
go SPEC
SPEC s
s)
          Skip s
s    -> SPEC -> s -> Stream m b
go SPEC
SPEC s
s
          Step s a
Stop      -> Stream m b
final

------------------------------------------------------------------------------
-- Left Folds
------------------------------------------------------------------------------

-- XXX run begin action only if the stream is not empty.
{-# INLINE_NORMAL foldlMx' #-}
foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' x -> a -> m x
fstep m x
begin x -> m b
done (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    m x
begin forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
x -> SPEC -> x -> s -> m b
go SPEC
SPEC x
x s
state
  where
    -- XXX !acc?
    {-# INLINE_LATE go #-}
    go :: SPEC -> x -> s -> m b
go !SPEC
_ x
acc s
st = x
acc seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                x
acc' <- x -> a -> m x
fstep x
acc a
x
                SPEC -> x -> s -> m b
go SPEC
SPEC x
acc' s
s
            Skip s
s -> SPEC -> x -> s -> m b
go SPEC
SPEC x
acc s
s
            Step s a
Stop   -> x -> m b
done x
acc

{-# INLINE foldlx' #-}
foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
foldlx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
foldlx' x -> a -> x
fstep x
begin x -> b
done =
    forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' (\x
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done)

-- Adapted from the vector package.
-- XXX implement in terms of foldlMx'?
{-# INLINE_NORMAL foldlM' #-}
foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b
foldlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> m b
foldlM' b -> a -> m b
fstep m b
mbegin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = do
    b
begin <- m b
mbegin
    SPEC -> b -> s -> m b
go SPEC
SPEC b
begin s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> b -> s -> m b
go !SPEC
_ b
acc s
st = b
acc seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
acc' <- b -> a -> m b
fstep b
acc a
x
                SPEC -> b -> s -> m b
go SPEC
SPEC b
acc' s
s
            Skip s
s -> SPEC -> b -> s -> m b
go SPEC
SPEC b
acc s
s
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return b
acc

{-# INLINE foldl' #-}
foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
foldl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> m b
foldl' b -> a -> b
fstep b
begin = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> m b
foldlM' (\b
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
fstep b
b a
a)) (forall (m :: * -> *) a. Monad m => a -> m a
return b
begin)

------------------------------------------------------------------------------
-- Special folds
------------------------------------------------------------------------------

-- >>> drain = mapM_ (\_ -> return ())

-- |
-- Definitions:
--
-- >>> drain = Stream.fold Fold.drain
-- >>> drain = Stream.foldrM (\_ xs -> xs) (return ())
--
-- Run a stream, discarding the results.
--
{-# INLINE_LATE drain #-}
drain :: Monad m => Stream m a -> m ()
-- drain = foldrM (\_ xs -> xs) (return ())
drain :: forall (m :: * -> *) a. Monad m => Stream m a -> m ()
drain (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> m ()
go SPEC
SPEC s
state
  where
    go :: SPEC -> s -> m ()
go !SPEC
_ s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
_ s
s -> SPEC -> s -> m ()
go SPEC
SPEC s
s
            Skip s
s    -> SPEC -> s -> m ()
go SPEC
SPEC s
s
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

------------------------------------------------------------------------------
-- To Containers
------------------------------------------------------------------------------

-- This toList impl is faster (30% on streaming-benchmarks) than the
-- corresponding left fold. The left fold retains an additional argument in the
-- recursive loop.
--
-- Core for the right fold loop:
--
-- main_$s$wgo1
--   = \ sc_s3e6 sc1_s3e5 ->
--       case ># sc1_s3e5 100000# of {
--         __DEFAULT ->
--           case main_$s$wgo1 sc_s3e6 (+# sc1_s3e5 1#) of
--
-- Core for the left fold loop:
--
--  main_$s$wgo1
--   = \ sc_s3oT sc1_s3oS sc2_s3oR ->
--       case sc2_s3oR of fs2_a2lw { __DEFAULT ->
--       case ># sc1_s3oS 100000# of {
--         __DEFAULT ->
--           let { wild_a2og = I# sc1_s3oS } in
--           main_$s$wgo1
--             sc_s3oT (+# sc1_s3oS 1#) (\ x_X9 -> fs2_a2lw (: wild_a2og x_X9));

-- |
-- Definitions:
--
-- >>> toList = Stream.foldr (:) []
-- >>> toList = Stream.fold Fold.toList
--
-- Convert a stream into a list in the underlying monad. The list can be
-- consumed lazily in a lazy monad (e.g. 'Identity'). In a strict monad (e.g.
-- IO) the whole list is generated and buffered before it can be consumed.
--
-- /Warning!/ working on large lists accumulated as buffers in memory could be
-- very inefficient, consider using "Streamly.Data.Array" instead.
--
-- Note that this could a bit more efficient compared to @Stream.fold
-- Fold.toList@, and it can fuse with pure list consumers.
--
{-# INLINE_NORMAL toList #-}
toList :: Monad m => Stream m a -> m [a]
toList :: forall (m :: * -> *) a. Monad m => Stream m a -> m [a]
toList = forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
Streamly.Internal.Data.Stream.StreamD.Type.foldr (:) []

-- Use foldr/build fusion to fuse with list consumers
-- This can be useful when using the IsList instance
{-# INLINE_LATE toListFB #-}
toListFB :: (a -> b -> b) -> b -> Stream Identity a -> b
toListFB :: forall a b. (a -> b -> b) -> b -> Stream Identity a -> b
toListFB a -> b -> b
c b
n (Stream State StreamK Identity a -> s -> Identity (Step s a)
step s
state) = s -> b
go s
state
  where
    go :: s -> b
go s
st = case forall a. Identity a -> a
runIdentity (State StreamK Identity a -> s -> Identity (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st) of
             Yield a
x s
s -> a
x a -> b -> b
`c` s -> b
go s
s
             Skip s
s    -> s -> b
go s
s
             Step s a
Stop      -> b
n

{-# RULES "toList Identity" Streamly.Internal.Data.Stream.StreamD.Type.toList = toListId #-}
{-# INLINE_EARLY toListId #-}
toListId :: Stream Identity a -> Identity [a]
toListId :: forall a. Stream Identity a -> Identity [a]
toListId Stream Identity a
s = forall a. a -> Identity a
Identity forall a b. (a -> b) -> a -> b
$ forall a. (forall b. (a -> b -> b) -> b -> b) -> [a]
build (\a -> b -> b
c b
n -> forall a b. (a -> b -> b) -> b -> Stream Identity a -> b
toListFB a -> b -> b
c b
n Stream Identity a
s)

------------------------------------------------------------------------------
-- Multi-stream folds
------------------------------------------------------------------------------

-- Adapted from the vector package.

-- | Compare two streams for equality
{-# INLINE_NORMAL eqBy #-}
eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy a -> b -> Bool
eq (Stream State StreamK m a -> s -> m (Step s a)
step1 s
t1) (Stream State StreamK m b -> s -> m (Step s b)
step2 s
t2) = SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC s
t1 s
t2
  where
    eq_loop0 :: SPEC -> s -> s -> m Bool
eq_loop0 !SPEC
_ s
s1 s
s2 = do
      Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s1
      case Step s a
r of
        Yield a
x s
s1' -> SPEC -> a -> s -> s -> m Bool
eq_loop1 SPEC
SPEC a
x s
s1' s
s2
        Skip    s
s1' -> SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC   s
s1' s
s2
        Step s a
Stop        -> s -> m Bool
eq_null s
s2

    eq_loop1 :: SPEC -> a -> s -> s -> m Bool
eq_loop1 !SPEC
_ a
x s
s1 s
s2 = do
      Step s b
r <- State StreamK m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
y s
s2'
          | a -> b -> Bool
eq a
x b
y    -> SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC   s
s1 s
s2'
          | Bool
otherwise -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Skip    s
s2'   -> SPEC -> a -> s -> s -> m Bool
eq_loop1 SPEC
SPEC a
x s
s1 s
s2'
        Step s b
Stop          -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

    eq_null :: s -> m Bool
eq_null s
s2 = do
      Step s b
r <- State StreamK m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
_ s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Skip s
s2'  -> s -> m Bool
eq_null s
s2'
        Step s b
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- Adapted from the vector package.

-- | Compare two streams lexicographically.
{-# INLINE_NORMAL cmpBy #-}
cmpBy
    :: Monad m
    => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy a -> b -> Ordering
cmp (Stream State StreamK m a -> s -> m (Step s a)
step1 s
t1) (Stream State StreamK m b -> s -> m (Step s b)
step2 s
t2) = SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC s
t1 s
t2
  where
    cmp_loop0 :: SPEC -> s -> s -> m Ordering
cmp_loop0 !SPEC
_ s
s1 s
s2 = do
      Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s1
      case Step s a
r of
        Yield a
x s
s1' -> SPEC -> a -> s -> s -> m Ordering
cmp_loop1 SPEC
SPEC a
x s
s1' s
s2
        Skip    s
s1' -> SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC   s
s1' s
s2
        Step s a
Stop        -> s -> m Ordering
cmp_null s
s2

    cmp_loop1 :: SPEC -> a -> s -> s -> m Ordering
cmp_loop1 !SPEC
_ a
x s
s1 s
s2 = do
      Step s b
r <- State StreamK m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
y s
s2' -> case a
x a -> b -> Ordering
`cmp` b
y of
                         Ordering
EQ -> SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC s
s1 s
s2'
                         Ordering
c  -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
c
        Skip    s
s2' -> SPEC -> a -> s -> s -> m Ordering
cmp_loop1 SPEC
SPEC a
x s
s1 s
s2'
        Step s b
Stop        -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
GT

    cmp_null :: s -> m Ordering
cmp_null s
s2 = do
      Step s b
r <- State StreamK m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
_ s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
LT
        Skip s
s2'  -> s -> m Ordering
cmp_null s
s2'
        Step s b
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
EQ

------------------------------------------------------------------------------
-- Transformations
------------------------------------------------------------------------------

-- Adapted from the vector package.

-- |
-- >>> mapM f = Stream.sequence . fmap f
--
-- Apply a monadic function to each element of the stream and replace it with
-- the output of the resulting action.
--
-- >>> s = Stream.fromList ["a", "b", "c"]
-- >>> Stream.fold Fold.drain $ Stream.mapM putStr s
-- abc
--
{-# INLINE_NORMAL mapM #-}
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM a -> m b
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}. State StreamK m a -> s -> m (Step s b)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s b)
step' State StreamK m a
gst s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> m b
f a
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
a s
s
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE map #-}
map :: Monad m => (a -> b) -> Stream m a -> Stream m b
map :: forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
map a -> b
f = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

-- (Functor m) based implementation of fmap does not fuse well in
-- streaming-benchmarks. XXX need to investigate why.
instance Monad m => Functor (Stream m) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> Stream m a -> Stream m b
fmap = forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
map

    {-# INLINE (<$) #-}
    <$ :: forall a b. a -> Stream m b -> Stream m a
(<$) = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> b -> a
const

------------------------------------------------------------------------------
-- Lists
------------------------------------------------------------------------------

-- XXX Show instance is 10x slower compared to read, we can do much better.
-- The list show instance itself is really slow.

-- XXX The default definitions of "<" in the Ord instance etc. do not perform
-- well, because they do not get inlined. Need to add INLINE in Ord class in
-- base?

instance IsList (Stream Identity a) where
    type (Item (Stream Identity a)) = a

    {-# INLINE fromList #-}
    fromList :: [Item (Stream Identity a)] -> Stream Identity a
fromList = forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Streamly.Internal.Data.Stream.StreamD.Type.fromList

    {-# INLINE toList #-}
    toList :: Stream Identity a -> [Item (Stream Identity a)]
toList = forall a. Identity a -> a
runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> m [a]
Streamly.Internal.Data.Stream.StreamD.Type.toList

instance Eq a => Eq (Stream Identity a) where
    {-# INLINE (==) #-}
    == :: Stream Identity a -> Stream Identity a -> Bool
(==) Stream Identity a
xs Stream Identity a
ys = forall a. Identity a -> a
runIdentity forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy forall a. Eq a => a -> a -> Bool
(==) Stream Identity a
xs Stream Identity a
ys

instance Ord a => Ord (Stream Identity a) where
    {-# INLINE compare #-}
    compare :: Stream Identity a -> Stream Identity a -> Ordering
compare Stream Identity a
xs Stream Identity a
ys = forall a. Identity a -> a
runIdentity forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy forall a. Ord a => a -> a -> Ordering
compare Stream Identity a
xs Stream Identity a
ys

    {-# INLINE (<) #-}
    Stream Identity a
x < :: Stream Identity a -> Stream Identity a -> Bool
< Stream Identity a
y =
        case forall a. Ord a => a -> a -> Ordering
compare Stream Identity a
x Stream Identity a
y of
            Ordering
LT -> Bool
True
            Ordering
_ -> Bool
False

    {-# INLINE (<=) #-}
    Stream Identity a
x <= :: Stream Identity a -> Stream Identity a -> Bool
<= Stream Identity a
y =
        case forall a. Ord a => a -> a -> Ordering
compare Stream Identity a
x Stream Identity a
y of
            Ordering
GT -> Bool
False
            Ordering
_ -> Bool
True

    {-# INLINE (>) #-}
    Stream Identity a
x > :: Stream Identity a -> Stream Identity a -> Bool
> Stream Identity a
y =
        case forall a. Ord a => a -> a -> Ordering
compare Stream Identity a
x Stream Identity a
y of
            Ordering
GT -> Bool
True
            Ordering
_ -> Bool
False

    {-# INLINE (>=) #-}
    Stream Identity a
x >= :: Stream Identity a -> Stream Identity a -> Bool
>= Stream Identity a
y =
        case forall a. Ord a => a -> a -> Ordering
compare Stream Identity a
x Stream Identity a
y of
            Ordering
LT -> Bool
False
            Ordering
_ -> Bool
True

    {-# INLINE max #-}
    max :: Stream Identity a -> Stream Identity a -> Stream Identity a
max Stream Identity a
x Stream Identity a
y = if Stream Identity a
x forall a. Ord a => a -> a -> Bool
<= Stream Identity a
y then Stream Identity a
y else Stream Identity a
x

    {-# INLINE min #-}
    min :: Stream Identity a -> Stream Identity a -> Stream Identity a
min Stream Identity a
x Stream Identity a
y = if Stream Identity a
x forall a. Ord a => a -> a -> Bool
<= Stream Identity a
y then Stream Identity a
x else Stream Identity a
y

instance Show a => Show (Stream Identity a) where
    showsPrec :: Int -> Stream Identity a -> ShowS
showsPrec Int
p Stream Identity a
dl = Bool -> ShowS -> ShowS
showParen (Int
p forall a. Ord a => a -> a -> Bool
> Int
10) forall a b. (a -> b) -> a -> b
$
        [Char] -> ShowS
showString [Char]
"fromList " forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> ShowS
shows (forall l. IsList l => l -> [Item l]
GHC.Exts.toList Stream Identity a
dl)

instance Read a => Read (Stream Identity a) where
    readPrec :: ReadPrec (Stream Identity a)
readPrec = forall a. ReadPrec a -> ReadPrec a
parens forall a b. (a -> b) -> a -> b
$ forall a. Int -> ReadPrec a -> ReadPrec a
prec Int
10 forall a b. (a -> b) -> a -> b
$ do
        Ident [Char]
"fromList" <- ReadPrec Lexeme
lexP
        forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Streamly.Internal.Data.Stream.StreamD.Type.fromList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Read a => ReadPrec a
readPrec

    readListPrec :: ReadPrec [Stream Identity a]
readListPrec = forall a. Read a => ReadPrec [a]
readListPrecDefault

instance (a ~ Char) => IsString (Stream Identity a) where
    {-# INLINE fromString #-}
    fromString :: [Char] -> Stream Identity a
fromString = forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Streamly.Internal.Data.Stream.StreamD.Type.fromList

-------------------------------------------------------------------------------
-- Foldable
-------------------------------------------------------------------------------

-- The default Foldable instance has several issues:
-- 1) several definitions do not have INLINE on them, so we provide
--    re-implementations with INLINE pragmas.
-- 2) the definitions of sum/product/maximum/minimum are inefficient as they
--    use right folds, they cannot run in constant memory. We provide
--    implementations using strict left folds here.

-- There is no Traversable instance because, there is no scalable cons for
-- StreamD, use toList and fromList instead.

instance (Foldable m, Monad m) => Foldable (Stream m) where

    {-# INLINE foldMap #-}
    foldMap :: forall m a. Monoid m => (a -> m) -> Stream m a -> m
foldMap a -> m
f =
        forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
Data.Foldable.fold
            forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
Streamly.Internal.Data.Stream.StreamD.Type.foldr (forall a. Monoid a => a -> a -> a
mappend forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m
f) forall a. Monoid a => a
mempty

    {-# INLINE foldr #-}
    foldr :: forall a b. (a -> b -> b) -> b -> Stream m a -> b
foldr a -> b -> b
f b
z Stream m a
t = forall a. Endo a -> a -> a
appEndo (forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (forall a. (a -> a) -> Endo a
Endo forall b c a. Coercible b c => (b -> c) -> (a -> b) -> a -> c
#. a -> b -> b
f) Stream m a
t) b
z

    {-# INLINE foldl' #-}
    foldl' :: forall b a. (b -> a -> b) -> b -> Stream m a -> b
foldl' b -> a -> b
f b
z0 Stream m a
xs = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Data.Foldable.foldr forall {b}. a -> (b -> b) -> b -> b
f' forall a. a -> a
id Stream m a
xs b
z0
        where f' :: a -> (b -> b) -> b -> b
f' a
x b -> b
k = oneShot :: forall a b. (a -> b) -> a -> b
oneShot forall a b. (a -> b) -> a -> b
$ \b
z -> b -> b
k forall a b. (a -> b) -> a -> b
$! b -> a -> b
f b
z a
x

    {-# INLINE length #-}
    length :: forall a. Stream m a -> Int
length = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' (\Int
n a
_ -> Int
n forall a. Num a => a -> a -> a
+ Int
1) Int
0

    {-# INLINE elem #-}
    elem :: forall a. Eq a => a -> Stream m a -> Bool
elem = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Eq a => a -> a -> Bool
(==)

    {-# INLINE maximum #-}
    maximum :: forall a. Ord a => Stream m a -> a
maximum =
          forall a. a -> Maybe a -> a
fromMaybe (forall a. [Char] -> a
errorWithoutStackTrace [Char]
"maximum: empty stream")
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Maybe' a -> Maybe a
toMaybe
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' forall {a}. Ord a => Maybe' a -> a -> Maybe' a
getMax forall a. Maybe' a
Nothing'

        where

        getMax :: Maybe' a -> a -> Maybe' a
getMax Maybe' a
Nothing' a
x = forall a. a -> Maybe' a
Just' a
x
        getMax (Just' a
mx) a
x = forall a. a -> Maybe' a
Just' forall a b. (a -> b) -> a -> b
$! forall a. Ord a => a -> a -> a
max a
mx a
x

    {-# INLINE minimum #-}
    minimum :: forall a. Ord a => Stream m a -> a
minimum =
          forall a. a -> Maybe a -> a
fromMaybe (forall a. [Char] -> a
errorWithoutStackTrace [Char]
"minimum: empty stream")
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Maybe' a -> Maybe a
toMaybe
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' forall {a}. Ord a => Maybe' a -> a -> Maybe' a
getMin forall a. Maybe' a
Nothing'

        where

        getMin :: Maybe' a -> a -> Maybe' a
getMin Maybe' a
Nothing' a
x = forall a. a -> Maybe' a
Just' a
x
        getMin (Just' a
mn) a
x = forall a. a -> Maybe' a
Just' forall a b. (a -> b) -> a -> b
$! forall a. Ord a => a -> a -> a
min a
mn a
x

    {-# INLINE sum #-}
    sum :: forall a. Num a => Stream m a -> a
sum = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' forall a. Num a => a -> a -> a
(+) a
0

    {-# INLINE product #-}
    product :: forall a. Num a => Stream m a -> a
product = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' forall a. Num a => a -> a -> a
(*) a
1

-------------------------------------------------------------------------------
-- Filtering
-------------------------------------------------------------------------------

-- Adapted from the vector package.

-- | Take first 'n' elements from the stream and discard the rest.
--
{-# INLINE_NORMAL take #-}
take :: Applicative m => Int -> Stream m a -> Stream m a
take :: forall (m :: * -> *) a.
Applicative m =>
Int -> Stream m a -> Stream m a
take Int
n (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = Int
n seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, Int) -> m (Step (s, Int) a)
step' (s
state, Int
0)

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, Int) -> m (Step (s, Int) a)
step' State StreamK m a
gst (s
st, Int
i) | Int
i forall a. Ord a => a -> a -> Bool
< Int
n = do
        (\case
            Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (s
s, Int
i forall a. Num a => a -> a -> a
+ Int
1)
            Skip s
s    -> forall s a. s -> Step s a
Skip (s
s, Int
i)
            Step s a
Stop      -> forall s a. Step s a
Stop) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
    step' State StreamK m a
_ (s
_, Int
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall s a. Step s a
Stop

-- Adapted from the vector package.

-- | Same as 'takeWhile' but with a monadic predicate.
--
{-# INLINE_NORMAL takeWhileM #-}
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
-- takeWhileM p = scanMaybe (FL.takingEndByM_ (\x -> not <$> p x))
takeWhileM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
takeWhileM a -> m Bool
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
b then forall s a. a -> s -> Step s a
Yield a
x s
s else forall s a. Step s a
Stop
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | End the stream as soon as the predicate fails on an element.
--
{-# INLINE takeWhile #-}
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
takeWhile :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
takeWhile a -> Bool
f = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
takeWhileM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

-- Like takeWhile but with an inverted condition and also taking
-- the matching element.

{-# INLINE_NORMAL takeEndByM #-}
takeEndByM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
takeEndByM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
takeEndByM a -> m Bool
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step' (forall a. a -> Maybe a
Just s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step' State StreamK m a
gst (Just s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                    if Bool -> Bool
not Bool
b
                    then forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just s
s)
                    else forall s a. a -> s -> Step s a
Yield a
x forall a. Maybe a
Nothing
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just s
s)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
_ Maybe s
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE takeEndBy #-}
takeEndBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
takeEndBy :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
takeEndBy a -> Bool
f = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
takeEndByM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

------------------------------------------------------------------------------
-- Zipping
------------------------------------------------------------------------------

-- | Like 'zipWith' but using a monadic zipping function.
--
{-# INLINE_NORMAL zipWithM #-}
zipWithM :: Monad m
    => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithM :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithM a -> b -> m c
f (Stream State StreamK m a -> s -> m (Step s a)
stepa s
ta) (Stream State StreamK m b -> s -> m (Step s b)
stepb s
tb) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> (s, s, Maybe a) -> m (Step (s, s, Maybe a) c)
step (s
ta, s
tb, forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step #-}
    step :: State StreamK m a -> (s, s, Maybe a) -> m (Step (s, s, Maybe a) c)
step State StreamK m a
gst (s
sa, s
sb, Maybe a
Nothing) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
stepa (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
sa
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
          case Step s a
r of
            Yield a
x s
sa' -> forall s a. s -> Step s a
Skip (s
sa', s
sb, forall a. a -> Maybe a
Just a
x)
            Skip s
sa'    -> forall s a. s -> Step s a
Skip (s
sa', s
sb, forall a. Maybe a
Nothing)
            Step s a
Stop        -> forall s a. Step s a
Stop

    step State StreamK m a
gst (s
sa, s
sb, Just a
x) = do
        Step s b
r <- State StreamK m b -> s -> m (Step s b)
stepb (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
sb
        case Step s b
r of
            Yield b
y s
sb' -> do
                c
z <- a -> b -> m c
f a
x b
y
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield c
z (s
sa, s
sb', forall a. Maybe a
Nothing)
            Skip s
sb' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
sa, s
sb', forall a. a -> Maybe a
Just a
x)
            Step s b
Stop     -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# RULES "zipWithM xs xs"
    forall f xs. zipWithM @Identity f xs xs = mapM (\x -> f x x) xs #-}

-- | Stream @a@ is evaluated first, followed by stream @b@, the resulting
-- elements @a@ and @b@ are then zipped using the supplied zip function and the
-- result @c@ is yielded to the consumer.
--
-- If stream @a@ or stream @b@ ends, the zipped stream ends. If stream @b@ ends
-- first, the element @a@ from previous evaluation of stream @a@ is discarded.
--
-- >>> s1 = Stream.fromList [1,2,3]
-- >>> s2 = Stream.fromList [4,5,6]
-- >>> Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2
-- [5,7,9]
--
{-# INLINE zipWith #-}
zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith a -> b -> c
f = forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithM (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))

------------------------------------------------------------------------------
-- Combine N Streams - concatAp
------------------------------------------------------------------------------

-- | Apply a stream of functions to a stream of values and flatten the results.
--
-- Note that the second stream is evaluated multiple times.
--
-- >>> crossApply = Stream.crossWith id
--
{-# INLINE_NORMAL crossApply #-}
crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
crossApply :: forall (f :: * -> *) a b.
Functor f =>
Stream f (a -> b) -> Stream f a -> Stream f b
crossApply (Stream State StreamK f (a -> b) -> s -> f (Step s (a -> b))
stepa s
statea) (Stream State StreamK f a -> s -> f (Step s a)
stepb s
stateb) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b)
step' (forall a b. a -> Either a b
Left s
statea)

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b)
step' State StreamK m a
gst (Left s
st) = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
        (\case
            Yield a -> b
f s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (a -> b
f, s
s, s
stateb))
            Skip    s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
            Step s (a -> b)
Stop      -> forall s a. Step s a
Stop)
        (State StreamK f (a -> b) -> s -> f (Step s (a -> b))
stepa (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st)
    step' State StreamK m a
gst (Right (a -> b
f, s
os, s
st)) = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
        (\case
            Yield a
a s
s -> forall s a. a -> s -> Step s a
Yield (a -> b
f a
a) (forall a b. b -> Either a b
Right (a -> b
f, s
os, s
s))
            Skip s
s    -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (a -> b
f,s
os, s
s))
            Step s a
Stop      -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
os))
        (State StreamK f a -> s -> f (Step s a)
stepb (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st)

{-# INLINE_NORMAL crossApplySnd #-}
crossApplySnd :: Functor f => Stream f a -> Stream f b -> Stream f b
crossApplySnd :: forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f b
crossApplySnd (Stream State StreamK f a -> s -> f (Step s a)
stepa s
statea) (Stream State StreamK f b -> s -> f (Step s b)
stepb s
stateb) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK f b
-> Either s (s, s) -> f (Step (Either s (s, s)) b)
step (forall a b. a -> Either a b
Left s
statea)

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK f b
-> Either s (s, s) -> f (Step (Either s (s, s)) b)
step State StreamK f b
gst (Left s
st) =
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield a
_ s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (s
s, s
stateb))
                 Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
                 Step s a
Stop -> forall s a. Step s a
Stop)
            (State StreamK f a -> s -> f (Step s a)
stepa (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK f b
gst) s
st)
    step State StreamK f b
gst (Right (s
ostate, s
st)) =
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield b
b s
s -> forall s a. a -> s -> Step s a
Yield b
b (forall a b. b -> Either a b
Right (s
ostate, s
s))
                 Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (s
ostate, s
s))
                 Step s b
Stop -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
ostate))
            (State StreamK f b -> s -> f (Step s b)
stepb State StreamK f b
gst s
st)

{-# INLINE_NORMAL crossApplyFst #-}
crossApplyFst :: Functor f => Stream f a -> Stream f b -> Stream f a
crossApplyFst :: forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f a
crossApplyFst (Stream State StreamK f a -> s -> f (Step s a)
stepa s
statea) (Stream State StreamK f b -> s -> f (Step s b)
stepb s
stateb) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK f a
-> Either s (s, s, a) -> f (Step (Either s (s, s, a)) a)
step (forall a b. a -> Either a b
Left s
statea)

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK f a
-> Either s (s, s, a) -> f (Step (Either s (s, s, a)) a)
step State StreamK f a
gst (Left s
st) =
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield a
b s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (s
s, s
stateb, a
b))
                 Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
                 Step s a
Stop -> forall s a. Step s a
Stop)
            (State StreamK f a -> s -> f (Step s a)
stepa State StreamK f a
gst s
st)
    step State StreamK f a
gst (Right (s
ostate, s
st, a
b)) =
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield b
_ s
s -> forall s a. a -> s -> Step s a
Yield a
b (forall a b. b -> Either a b
Right (s
ostate, s
s, a
b))
                 Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (s
ostate, s
s, a
b))
                 Step s b
Stop -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
ostate))
            (State StreamK f b -> s -> f (Step s b)
stepb (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK f a
gst) s
st)

{-
instance Applicative f => Applicative (Stream f) where
    {-# INLINE pure #-}
    pure = fromPure

    {-# INLINE (<*>) #-}
    (<*>) = crossApply

    {-# INLINE liftA2 #-}
    liftA2 f x = (<*>) (fmap f x)

    {-# INLINE (*>) #-}
    (*>) = crossApplySnd

    {-# INLINE (<*) #-}
    (<*) = crossApplyFst
-}

-- |
-- Definition:
--
-- >>> crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2
--
-- Note that the second stream is evaluated multiple times.
--
{-# INLINE crossWith #-}
crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
crossWith :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
crossWith a -> b -> c
f Stream m a
m1 Stream m b
m2 = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b -> c
f Stream m a
m1 forall (f :: * -> *) a b.
Functor f =>
Stream f (a -> b) -> Stream f a -> Stream f b
`crossApply` Stream m b
m2

-- | Given a @Stream m a@ and @Stream m b@ generate a stream with all possible
-- combinations of the tuple @(a, b)@.
--
-- Definition:
--
-- >>> cross = Stream.crossWith (,)
--
-- The second stream is evaluated multiple times. If that is not desired it can
-- be cached in an 'Data.Array.Array' and then generated from the array before
-- calling this function. Caching may also improve performance if the stream is
-- expensive to evaluate.
--
-- See 'Streamly.Internal.Data.Unfold.cross' for a much faster fused
-- alternative.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE cross #-}
cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b)
cross :: forall (m :: * -> *) a b.
Monad m =>
Stream m a -> Stream m b -> Stream m (a, b)
cross = forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
crossWith (,)

------------------------------------------------------------------------------
-- Combine N Streams - unfoldMany
------------------------------------------------------------------------------

{-# ANN type ConcatMapUState Fuse #-}
data ConcatMapUState o i =
      ConcatMapUOuter o
    | ConcatMapUInner o i

-- | @unfoldMany unfold stream@ uses @unfold@ to map the input stream elements
-- to streams and then flattens the generated streams into a single output
-- stream.

-- This is like 'concatMap' but uses an unfold with an explicit state to
-- generate the stream instead of a 'Stream' type generator. This allows better
-- optimization via fusion.  This can be many times more efficient than
-- 'concatMap'.

-- | Like 'concatMap' but uses an 'Unfold' for stream generation. Unlike
-- 'concatMap' this can fuse the 'Unfold' code with the inner loop and
-- therefore provide many times better performance.
--
{-# INLINE_NORMAL unfoldMany #-}
unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b
unfoldMany :: forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
unfoldMany (Unfold s -> m (Step s b)
istep a -> m s
inject) (Stream State StreamK m a -> s -> m (Step s a)
ostep s
ost) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> ConcatMapUState s s -> m (Step (ConcatMapUState s s) b)
step (forall o i. o -> ConcatMapUState o i
ConcatMapUOuter s
ost)
  where
    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> ConcatMapUState s s -> m (Step (ConcatMapUState s s) b)
step State StreamK m a
gst (ConcatMapUOuter s
o) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
ostep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
o
        case Step s a
r of
            Yield a
a s
o' -> do
                s
i <- a -> m s
inject a
a
                s
i seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. s -> Step s a
Skip (forall o i. o -> i -> ConcatMapUState o i
ConcatMapUInner s
o' s
i))
            Skip s
o' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. o -> ConcatMapUState o i
ConcatMapUOuter s
o')
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State StreamK m a
_ (ConcatMapUInner s
o s
i) = do
        Step s b
r <- s -> m (Step s b)
istep s
i
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Yield b
x s
i' -> forall s a. a -> s -> Step s a
Yield b
x (forall o i. o -> i -> ConcatMapUState o i
ConcatMapUInner s
o s
i')
            Skip s
i'    -> forall s a. s -> Step s a
Skip (forall o i. o -> i -> ConcatMapUState o i
ConcatMapUInner s
o s
i')
            Step s b
Stop       -> forall s a. s -> Step s a
Skip (forall o i. o -> ConcatMapUState o i
ConcatMapUOuter s
o)

------------------------------------------------------------------------------
-- Combine N Streams - concatMap
------------------------------------------------------------------------------

-- Adapted from the vector package.

-- | Map a stream producing monadic function on each element of the stream
-- and then flatten the results into a single stream. Since the stream
-- generation function is monadic, unlike 'concatMap', it can produce an
-- effect at the beginning of each iteration of the inner loop.
--
-- See 'unfoldMany' for a fusible alternative.
--
{-# INLINE_NORMAL concatMapM #-}
concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM a -> m (Stream m b)
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> Either s (Stream m b, s)
-> m (Step (Either s (Stream m b, s)) b)
step' (forall a b. a -> Either a b
Left s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> Either s (Stream m b, s)
-> m (Step (Either s (Stream m b, s)) b)
step' State StreamK m a
gst (Left s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
a s
s -> do
                Stream m b
b_stream <- a -> m (Stream m b)
f a
a
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (Stream m b
b_stream, s
s))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    -- XXX flattenArrays is 5x faster than "concatMap fromArray". if somehow we
    -- can get inner_step to inline and fuse here we can perhaps get the same
    -- performance using "concatMap fromArray".
    --
    -- XXX using the pattern synonym "Stream" causes a major performance issue
    -- here even if the synonym does not include an adaptState call. Need to
    -- find out why. Is that something to be fixed in GHC?
    step' State StreamK m a
gst (Right (UnStream State StreamK m b -> s -> m (Step s b)
inner_step s
inner_st, s
st)) = do
        Step s b
r <- State StreamK m b -> s -> m (Step s b)
inner_step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
inner_st
        case Step s b
r of
            Yield b
b s
inner_s ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
inner_step s
inner_s, s
st))
            Skip s
inner_s ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
inner_step s
inner_s, s
st))
            Step s b
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
st)

-- | Map a stream producing function on each element of the stream and then
-- flatten the results into a single stream.
--
-- >>> concatMap f = Stream.concatMapM (return . f)
-- >>> concatMap f = Stream.concat . fmap f
-- >>> concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)
--
-- See 'unfoldMany' for a fusible alternative.
--
{-# INLINE concatMap #-}
concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
concatMap :: forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
concatMap a -> Stream m b
f = forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m b
f)

-- | Flatten a stream of streams to a single stream.
--
-- >>> concat = Stream.concatMap id
--
-- /Pre-release/
{-# INLINE concat #-}
concat :: Monad m => Stream m (Stream m a) -> Stream m a
concat :: forall (m :: * -> *) a.
Monad m =>
Stream m (Stream m a) -> Stream m a
concat = forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
concatMap forall a. a -> a
id

-- XXX The idea behind this rule is to rewrite any calls to "concatMap
-- fromArray" automatically to flattenArrays which is much faster.  However, we
-- need an INLINE_EARLY on concatMap for this rule to fire. But if we use
-- INLINE_EARLY on concatMap or fromArray then direct uses of
-- "concatMap fromArray" (without the RULE) become much slower, this means
-- "concatMap f" in general would become slower. Need to find a solution to
-- this.
--
-- {-# RULES "concatMap Array.toStreamD"
--      concatMap Array.toStreamD = Array.flattenArray #-}

-- >>> concatEffect = Stream.concat . lift    -- requires (MonadTrans t)
-- >>> concatEffect = join . lift             -- requires (MonadTrans t, Monad (Stream m))

-- | Given a stream value in the underlying monad, lift and join the underlying
-- monad with the stream monad.
--
-- >>> concatEffect = Stream.concat . Stream.fromEffect
-- >>> concatEffect eff = Stream.concatMapM (\() -> eff) (Stream.fromPure ())
--
-- See also: 'concat', 'sequence'
--
{-# INLINE concatEffect #-}
concatEffect :: Monad m => m (Stream m a) -> Stream m a
concatEffect :: forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
concatEffect m (Stream m a)
generator = forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM (\() -> m (Stream m a)
generator) (forall (m :: * -> *) a. Applicative m => a -> Stream m a
fromPure ())

{-
-- NOTE: even though concatMap for StreamD is 4x faster compared to StreamK,
-- the monad instance does not seem to be significantly faster.
instance Monad m => Monad (Stream m) where
    {-# INLINE return #-}
    return = pure

    {-# INLINE (>>=) #-}
    (>>=) = flip concatMap

    {-# INLINE (>>) #-}
    (>>) = (*>)
-}

------------------------------------------------------------------------------
-- Traversing a tree top down
------------------------------------------------------------------------------

-- Next stream is to be generated by the return value of the previous stream. A
-- general intuitive way of doing that could be to use an appending monad
-- instance for streams where the result of the previous stream is used to
-- generate the next one. In the first pass we can just emit the values in the
-- stream and keep building a buffered list/stream, once done we can then
-- process the buffered stream.

-- | Generate a stream from an initial state, scan and concat the stream,
-- generate a stream again from the final state of the previous scan and repeat
-- the process.
{-# INLINE_NORMAL concatIterateScan #-}
concatIterateScan :: Monad m =>
       (b -> a -> m b)
    -> (b -> m (Maybe (b, Stream m a)))
    -> b
    -> Stream m a
concatIterateScan :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b)
-> (b -> m (Maybe (b, Stream m a))) -> b -> Stream m a
concatIterateScan b -> a -> m b
scanner b -> m (Maybe (b, Stream m a))
generate b
initial = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> Either b (b, Stream m a)
-> m (Step (Either b (b, Stream m a)) a)
step (forall a b. a -> Either a b
Left b
initial)

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> Either b (b, Stream m a)
-> m (Step (Either b (b, Stream m a)) a)
step State StreamK m a
_ (Left b
acc) = do
        Maybe (b, Stream m a)
r <- b -> m (Maybe (b, Stream m a))
generate b
acc
        case Maybe (b, Stream m a)
r of
            Maybe (b, Stream m a)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
            Just (b, Stream m a)
v -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (b, Stream m a)
v)

    step State StreamK m a
gst (Right (b
st, UnStream State StreamK m a -> s -> m (Step s a)
inner_step s
inner_st)) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
inner_step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
inner_st
        case Step s a
r of
            Yield a
b s
inner_s -> do
                b
acc <- b -> a -> m b
scanner b
st a
b
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
b (forall a b. b -> Either a b
Right (b
acc, forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
inner_step s
inner_s))
            Skip s
inner_s ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (b
st, forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
inner_step s
inner_s))
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left b
st)

-- Note: The iterate function returns a Maybe Stream instead of returning a nil
-- stream for indicating a leaf node. This is to optimize so that we do not
-- have to store any state. This makes the stored state proportional to the
-- number of non-leaf nodes rather than total number of nodes.

-- | Same as 'concatIterateBfs' except that the traversal of the last
-- element on a level is emitted first and then going backwards up to the first
-- element (reversed ordering). This may be slightly faster than
-- 'concatIterateBfs'.
--
{-# INLINE_NORMAL concatIterateBfsRev #-}
concatIterateBfsRev :: Monad m =>
       (a -> Maybe (Stream m a))
    -> Stream m a
    -> Stream m a
concatIterateBfsRev :: forall (m :: * -> *) a.
Monad m =>
(a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
concatIterateBfsRev a -> Maybe (Stream m a)
f Stream m a
stream = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> (Stream m a, [Stream m a])
-> m (Step (Stream m a, [Stream m a]) a)
step (Stream m a
stream, [])

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> (Stream m a, [Stream m a])
-> m (Step (Stream m a, [Stream m a]) a)
step State StreamK m a
gst (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st, [Stream m a]
xs) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
a s
s -> do
                let xs1 :: [Stream m a]
xs1 =
                        case a -> Maybe (Stream m a)
f a
a of
                            Maybe (Stream m a)
Nothing -> [Stream m a]
xs
                            Just Stream m a
x -> Stream m a
xforall a. a -> [a] -> [a]
:[Stream m a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s, [Stream m a]
xs1)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s, [Stream m a]
xs)
            Step s a
Stop ->
                case [Stream m a]
xs of
                    (Stream m a
y:[Stream m a]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (Stream m a
y, [Stream m a]
ys)
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Similar to 'concatIterateDfs' except that it traverses the stream in
-- breadth first style (BFS). First, all the elements in the input stream are
-- emitted, and then their traversals are emitted.
--
-- Example, list a directory tree using BFS:
--
-- >>> f = either (Just . Dir.readEitherPaths) (const Nothing)
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.concatIterateBfs f input
--
-- /Pre-release/
{-# INLINE_NORMAL concatIterateBfs #-}
concatIterateBfs :: Monad m =>
       (a -> Maybe (Stream m a))
    -> Stream m a
    -> Stream m a
concatIterateBfs :: forall (m :: * -> *) a.
Monad m =>
(a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
concatIterateBfs a -> Maybe (Stream m a)
f Stream m a
stream = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> (Stream m a, [Stream m a], [Stream m a])
-> m (Step (Stream m a, [Stream m a], [Stream m a]) a)
step (Stream m a
stream, [], [])

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> (Stream m a, [Stream m a], [Stream m a])
-> m (Step (Stream m a, [Stream m a], [Stream m a]) a)
step State StreamK m a
gst (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st, [Stream m a]
xs, [Stream m a]
ys) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
a s
s -> do
                let ys1 :: [Stream m a]
ys1 =
                        case a -> Maybe (Stream m a)
f a
a of
                            Maybe (Stream m a)
Nothing -> [Stream m a]
ys
                            Just Stream m a
y -> Stream m a
yforall a. a -> [a] -> [a]
:[Stream m a]
ys
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s, [Stream m a]
xs, [Stream m a]
ys1)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s, [Stream m a]
xs, [Stream m a]
ys)
            Step s a
Stop ->
                case [Stream m a]
xs of
                    (Stream m a
x:[Stream m a]
xs1) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (Stream m a
x, [Stream m a]
xs1, [Stream m a]
ys)
                    [] ->
                        case forall a. [a] -> [a]
reverse [Stream m a]
ys of
                            (Stream m a
x:[Stream m a]
xs1) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (Stream m a
x, [Stream m a]
xs1, [])
                            [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Traverse the stream in depth first style (DFS). Map each element in the
-- input stream to a stream and flatten, recursively map the resulting elements
-- as well to a stream and flatten until no more streams are generated.
--
-- Example, list a directory tree using DFS:
--
-- >>> f = either (Just . Dir.readEitherPaths) (const Nothing)
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.concatIterateDfs f input
--
-- This is equivalent to using @concatIterateWith StreamK.append@.
--
-- /Pre-release/
{-# INLINE_NORMAL concatIterateDfs #-}
concatIterateDfs :: Monad m =>
       (a -> Maybe (Stream m a))
    -> Stream m a
    -> Stream m a
concatIterateDfs :: forall (m :: * -> *) a.
Monad m =>
(a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
concatIterateDfs a -> Maybe (Stream m a)
f Stream m a
stream = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> (Stream m a, [Stream m a])
-> m (Step (Stream m a, [Stream m a]) a)
step (Stream m a
stream, [])

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> (Stream m a, [Stream m a])
-> m (Step (Stream m a, [Stream m a]) a)
step State StreamK m a
gst (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st, [Stream m a]
xs) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
a s
s -> do
                let st1 :: (Stream m a, [Stream m a])
st1 =
                        case a -> Maybe (Stream m a)
f a
a of
                            Maybe (Stream m a)
Nothing -> (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s, [Stream m a]
xs)
                            Just Stream m a
x -> (Stream m a
x, forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
sforall a. a -> [a] -> [a]
:[Stream m a]
xs)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (Stream m a, [Stream m a])
st1
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s, [Stream m a]
xs)
            Step s a
Stop ->
                case [Stream m a]
xs of
                    (Stream m a
y:[Stream m a]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (Stream m a
y, [Stream m a]
ys)
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# ANN type IterateUnfoldState Fuse #-}
data IterateUnfoldState o i =
      IterateUnfoldOuter o
    | IterateUnfoldInner o i [i]

-- | Same as @concatIterateDfs@ but more efficient due to stream fusion.
--
-- Example, list a directory tree using DFS:
--
-- >>> f = Unfold.either Dir.eitherReaderPaths Unfold.nil
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.unfoldIterateDfs f input
--
-- /Pre-release/
{-# INLINE_NORMAL unfoldIterateDfs #-}
unfoldIterateDfs :: Monad m =>
       Unfold m a a
    -> Stream m a
    -> Stream m a
unfoldIterateDfs :: forall (m :: * -> *) a.
Monad m =>
Unfold m a a -> Stream m a -> Stream m a
unfoldIterateDfs (Unfold s -> m (Step s a)
istep a -> m s
inject) (Stream State StreamK m a -> s -> m (Step s a)
ostep s
ost) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> IterateUnfoldState s s -> m (Step (IterateUnfoldState s s) a)
step (forall o i. o -> IterateUnfoldState o i
IterateUnfoldOuter s
ost)

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> IterateUnfoldState s s -> m (Step (IterateUnfoldState s s) a)
step State StreamK m a
gst (IterateUnfoldOuter s
o) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
ostep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
o
        case Step s a
r of
            Yield a
a s
s -> do
                s
i <- a -> m s
inject a
a
                s
i seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield a
a (forall o i. o -> i -> [i] -> IterateUnfoldState o i
IterateUnfoldInner s
s s
i []))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. o -> IterateUnfoldState o i
IterateUnfoldOuter s
s)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State StreamK m a
_ (IterateUnfoldInner s
o s
i [s]
ii) = do
        Step s a
r <- s -> m (Step s a)
istep s
i
        case Step s a
r of
            Yield a
x s
s -> do
                s
i1 <- a -> m s
inject a
x
                s
i1 seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall o i. o -> i -> [i] -> IterateUnfoldState o i
IterateUnfoldInner s
o s
i1 (s
sforall a. a -> [a] -> [a]
:[s]
ii))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. o -> i -> [i] -> IterateUnfoldState o i
IterateUnfoldInner s
o s
s [s]
ii)
            Step s a
Stop ->
                case [s]
ii of
                    (s
y:[s]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. o -> i -> [i] -> IterateUnfoldState o i
IterateUnfoldInner s
o s
y [s]
ys)
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. o -> IterateUnfoldState o i
IterateUnfoldOuter s
o)

{-# ANN type IterateUnfoldBFSRevState Fuse #-}
data IterateUnfoldBFSRevState o i =
      IterateUnfoldBFSRevOuter o [i]
    | IterateUnfoldBFSRevInner i [i]

-- | Like 'unfoldIterateBfs' but processes the children in reverse order,
-- therefore, may be slightly faster.
--
-- /Pre-release/
{-# INLINE_NORMAL unfoldIterateBfsRev #-}
unfoldIterateBfsRev :: Monad m =>
       Unfold m a a
    -> Stream m a
    -> Stream m a
unfoldIterateBfsRev :: forall (m :: * -> *) a.
Monad m =>
Unfold m a a -> Stream m a -> Stream m a
unfoldIterateBfsRev (Unfold s -> m (Step s a)
istep a -> m s
inject) (Stream State StreamK m a -> s -> m (Step s a)
ostep s
ost) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> IterateUnfoldBFSRevState s s
-> m (Step (IterateUnfoldBFSRevState s s) a)
step (forall o i. o -> [i] -> IterateUnfoldBFSRevState o i
IterateUnfoldBFSRevOuter s
ost [])

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> IterateUnfoldBFSRevState s s
-> m (Step (IterateUnfoldBFSRevState s s) a)
step State StreamK m a
gst (IterateUnfoldBFSRevOuter s
o [s]
ii) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
ostep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
o
        case Step s a
r of
            Yield a
a s
s -> do
                s
i <- a -> m s
inject a
a
                s
i seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield a
a (forall o i. o -> [i] -> IterateUnfoldBFSRevState o i
IterateUnfoldBFSRevOuter s
s (s
iforall a. a -> [a] -> [a]
:[s]
ii)))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. o -> [i] -> IterateUnfoldBFSRevState o i
IterateUnfoldBFSRevOuter s
s [s]
ii)
            Step s a
Stop ->
                case [s]
ii of
                    (s
y:[s]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. i -> [i] -> IterateUnfoldBFSRevState o i
IterateUnfoldBFSRevInner s
y [s]
ys)
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State StreamK m a
_ (IterateUnfoldBFSRevInner s
i [s]
ii) = do
        Step s a
r <- s -> m (Step s a)
istep s
i
        case Step s a
r of
            Yield a
x s
s -> do
                s
i1 <- a -> m s
inject a
x
                s
i1 seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall o i. i -> [i] -> IterateUnfoldBFSRevState o i
IterateUnfoldBFSRevInner s
s (s
i1forall a. a -> [a] -> [a]
:[s]
ii))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. i -> [i] -> IterateUnfoldBFSRevState o i
IterateUnfoldBFSRevInner s
s [s]
ii)
            Step s a
Stop ->
                case [s]
ii of
                    (s
y:[s]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. i -> [i] -> IterateUnfoldBFSRevState o i
IterateUnfoldBFSRevInner s
y [s]
ys)
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# ANN type IterateUnfoldBFSState Fuse #-}
data IterateUnfoldBFSState o i =
      IterateUnfoldBFSOuter o [i]
    | IterateUnfoldBFSInner i [i] [i]

-- | Like 'unfoldIterateDfs' but uses breadth first style traversal.
--
-- /Pre-release/
{-# INLINE_NORMAL unfoldIterateBfs #-}
unfoldIterateBfs :: Monad m =>
       Unfold m a a
    -> Stream m a
    -> Stream m a
unfoldIterateBfs :: forall (m :: * -> *) a.
Monad m =>
Unfold m a a -> Stream m a -> Stream m a
unfoldIterateBfs (Unfold s -> m (Step s a)
istep a -> m s
inject) (Stream State StreamK m a -> s -> m (Step s a)
ostep s
ost) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> IterateUnfoldBFSState s s
-> m (Step (IterateUnfoldBFSState s s) a)
step (forall o i. o -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSOuter s
ost [])

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> IterateUnfoldBFSState s s
-> m (Step (IterateUnfoldBFSState s s) a)
step State StreamK m a
gst (IterateUnfoldBFSOuter s
o [s]
rii) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
ostep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
o
        case Step s a
r of
            Yield a
a s
s -> do
                s
i <- a -> m s
inject a
a
                s
i seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield a
a (forall o i. o -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSOuter s
s (s
iforall a. a -> [a] -> [a]
:[s]
rii)))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. o -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSOuter s
s [s]
rii)
            Step s a
Stop ->
                case forall a. [a] -> [a]
reverse [s]
rii of
                    (s
y:[s]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. i -> [i] -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSInner s
y [s]
ys [])
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State StreamK m a
_ (IterateUnfoldBFSInner s
i [s]
ii [s]
rii) = do
        Step s a
r <- s -> m (Step s a)
istep s
i
        case Step s a
r of
            Yield a
x s
s -> do
                s
i1 <- a -> m s
inject a
x
                s
i1 seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall o i. i -> [i] -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSInner s
s [s]
ii (s
i1forall a. a -> [a] -> [a]
:[s]
rii))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. i -> [i] -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSInner s
s [s]
ii [s]
rii)
            Step s a
Stop ->
                case [s]
ii of
                    (s
y:[s]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. i -> [i] -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSInner s
y [s]
ys [s]
rii)
                    [] ->
                        case forall a. [a] -> [a]
reverse [s]
rii of
                            (s
y:[s]
ys) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall o i. i -> [i] -> [i] -> IterateUnfoldBFSState o i
IterateUnfoldBFSInner s
y [s]
ys [])
                            [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Folding a tree bottom up
------------------------------------------------------------------------------

-- | Binary BFS style reduce, folds a level entirely using the supplied fold
-- function, collecting the outputs as next level of the tree, then repeats the
-- same process on the next level. The last elements of a previously folded
-- level are folded first.
{-# INLINE_NORMAL reduceIterateBfs #-}
reduceIterateBfs :: Monad m =>
    (a -> a -> m a) -> Stream m a -> m (Maybe a)
reduceIterateBfs :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> m (Maybe a)
reduceIterateBfs a -> a -> m a
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> [a] -> Maybe a -> m (Maybe a)
go SPEC
SPEC s
state [] forall a. Maybe a
Nothing

    where

    go :: SPEC -> s -> [a] -> Maybe a -> m (Maybe a)
go SPEC
_ s
st [a]
xs Maybe a
Nothing = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x1 s
s -> SPEC -> s -> [a] -> Maybe a -> m (Maybe a)
go SPEC
SPEC s
s [a]
xs (forall a. a -> Maybe a
Just a
x1)
            Skip s
s -> SPEC -> s -> [a] -> Maybe a -> m (Maybe a)
go SPEC
SPEC s
s [a]
xs forall a. Maybe a
Nothing
            Step s a
Stop ->
                case [a]
xs of
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                    [a]
_ -> SPEC -> [a] -> [a] -> m (Maybe a)
goBuf SPEC
SPEC [a]
xs []
    go SPEC
_ s
st [a]
xs (Just a
x1) = do
        Step s a
r2 <- State StreamK m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r2 of
            Yield a
x2 s
s -> do
                a
x <- a -> a -> m a
f a
x1 a
x2
                SPEC -> s -> [a] -> Maybe a -> m (Maybe a)
go SPEC
SPEC s
s (a
xforall a. a -> [a] -> [a]
:[a]
xs) forall a. Maybe a
Nothing
            Skip s
s -> SPEC -> s -> [a] -> Maybe a -> m (Maybe a)
go SPEC
SPEC s
s [a]
xs (forall a. a -> Maybe a
Just a
x1)
            Step s a
Stop ->
                case [a]
xs of
                    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
x1)
                    [a]
_ -> SPEC -> [a] -> [a] -> m (Maybe a)
goBuf SPEC
SPEC (a
x1forall a. a -> [a] -> [a]
:[a]
xs) []

    goBuf :: SPEC -> [a] -> [a] -> m (Maybe a)
goBuf SPEC
_ [] [a]
ys = SPEC -> [a] -> [a] -> m (Maybe a)
goBuf SPEC
SPEC [a]
ys []
    goBuf SPEC
_ [a
x1] [a]
ys = do
        case [a]
ys of
            [] -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
x1)
            (a
x2:[a]
xs) -> do
                a
y <- a -> a -> m a
f a
x1 a
x2
                SPEC -> [a] -> [a] -> m (Maybe a)
goBuf SPEC
SPEC [a]
xs [a
y]
    goBuf SPEC
_ (a
x1:a
x2:[a]
xs) [a]
ys = do
        a
y <- a -> a -> m a
f a
x1 a
x2
        SPEC -> [a] -> [a] -> m (Maybe a)
goBuf SPEC
SPEC [a]
xs (a
yforall a. a -> [a] -> [a]
:[a]
ys)

-- | N-Ary BFS style iterative fold, if the input stream finished before the
-- fold then it returns Left otherwise Right. If the fold returns Left we
-- terminate.
--
-- /Unimplemented/
foldIterateBfs ::
    Fold m a (Either a a) -> Stream m a -> m (Maybe a)
foldIterateBfs :: forall (m :: * -> *) a.
Fold m a (Either a a) -> Stream m a -> m (Maybe a)
foldIterateBfs = forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Grouping/Splitting
------------------------------------------------------------------------------

-- s = stream state, fs = fold state
{-# ANN type FoldManyPost Fuse #-}
data FoldManyPost s fs b a
    = FoldManyPostStart s
    | FoldManyPostLoop s fs
    | FoldManyPostYield b (FoldManyPost s fs b a)
    | FoldManyPostDone

-- XXX Need a more intuitive name, and need to reconcile the names
-- foldMany/fold/parse/parseMany/parseManyPost etc.

-- XXX foldManyPost keeps the last fold always partial. if the last fold is
-- complete then another fold is applied on empty input. This is used for
-- applying folds like takeEndBy such that the last element is not the
-- separator (infix style). But that looks like a hack. We should remove this
-- and use a custom combinator for infix parsing.

-- | Like 'foldMany' but evaluates the fold even if the fold did not receive
-- any input, therefore, always results in a non-empty output even on an empty
-- stream (default result of the fold).
--
-- Example, empty stream:
--
-- >>> f = Fold.take 2 Fold.sum
-- >>> fmany = Stream.fold Fold.toList . Stream.foldManyPost f
-- >>> fmany $ Stream.fromList []
-- [0]
--
-- Example, last fold empty:
--
-- >>> fmany $ Stream.fromList [1..4]
-- [3,7,0]
--
-- Example, last fold non-empty:
--
-- >>> fmany $ Stream.fromList [1..5]
-- [3,7,5]
--
-- Note that using a closed fold e.g. @Fold.take 0@, would result in an
-- infinite stream without consuming the input.
--
-- /Pre-release/
--
{-# INLINE_NORMAL foldManyPost #-}
foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b
foldManyPost :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
foldManyPost (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a} {a}.
State StreamK m a
-> FoldManyPost s s b a -> m (Step (FoldManyPost s s b a) b)
step' (forall s fs b a. s -> FoldManyPost s fs b a
FoldManyPostStart s
state)

    where

    {-# INLINE consume #-}
    consume :: a -> s -> s -> m (Step (FoldManyPost s s b a) a)
consume a
x s
s s
fs = do
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Done b
b -> forall s fs b a.
b -> FoldManyPost s fs b a -> FoldManyPost s fs b a
FoldManyPostYield b
b (forall s fs b a. s -> FoldManyPost s fs b a
FoldManyPostStart s
s)
                  FL.Partial s
ps -> forall s fs b a. s -> fs -> FoldManyPost s fs b a
FoldManyPostLoop s
s s
ps

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> FoldManyPost s s b a -> m (Step (FoldManyPost s s b a) b)
step' State StreamK m a
_ (FoldManyPostStart s
st) = do
        Step s b
r <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                  FL.Done b
b -> forall s fs b a.
b -> FoldManyPost s fs b a -> FoldManyPost s fs b a
FoldManyPostYield b
b (forall s fs b a. s -> FoldManyPost s fs b a
FoldManyPostStart s
st)
                  FL.Partial s
fs -> forall s fs b a. s -> fs -> FoldManyPost s fs b a
FoldManyPostLoop s
st s
fs
    step' State StreamK m a
gst (FoldManyPostLoop s
st s
fs) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> forall {s} {a} {a}.
a -> s -> s -> m (Step (FoldManyPost s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a. s -> fs -> FoldManyPost s fs b a
FoldManyPostLoop s
s s
fs)
            Step s a
Stop -> do
                b
b <- s -> m b
extract s
fs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a.
b -> FoldManyPost s fs b a -> FoldManyPost s fs b a
FoldManyPostYield b
b forall s fs b a. FoldManyPost s fs b a
FoldManyPostDone)
    step' State StreamK m a
_ (FoldManyPostYield b
b FoldManyPost s s b a
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b FoldManyPost s s b a
next
    step' State StreamK m a
_ FoldManyPost s s b a
FoldManyPostDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# ANN type FoldMany Fuse #-}
data FoldMany s fs b a
    = FoldManyStart s
    | FoldManyFirst fs s
    | FoldManyLoop s fs
    | FoldManyYield b (FoldMany s fs b a)
    | FoldManyDone

-- XXX Nested foldMany does not fuse.

-- | Apply a 'Fold' repeatedly on a stream and emit the results in the output
-- stream.
--
-- Definition:
--
-- >>> foldMany f = Stream.parseMany (Parser.fromFold f)
--
-- Example, empty stream:
--
-- >>> f = Fold.take 2 Fold.sum
-- >>> fmany = Stream.fold Fold.toList . Stream.foldMany f
-- >>> fmany $ Stream.fromList []
-- []
--
-- Example, last fold empty:
--
-- >>> fmany $ Stream.fromList [1..4]
-- [3,7]
--
-- Example, last fold non-empty:
--
-- >>> fmany $ Stream.fromList [1..5]
-- [3,7,5]
--
-- Note that using a closed fold e.g. @Fold.take 0@, would result in an
-- infinite stream on a non-empty input stream.
--
{-# INLINE_NORMAL foldMany #-}
foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
foldMany :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
foldMany (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a} {a}.
State StreamK m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' (forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
state)

    where

    {-# INLINE consume #-}
    consume :: a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs = do
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Done b
b -> forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
s)
                  FL.Partial s
ps -> forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
ps

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' State StreamK m a
_ (FoldManyStart s
st) = do
        Step s b
r <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                  FL.Done b
b -> forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
st)
                  FL.Partial s
fs -> forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
st
    step' State StreamK m a
gst (FoldManyFirst s
fs s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> forall {s} {a} {a}. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
s)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State StreamK m a
gst (FoldManyLoop s
st s
fs) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> forall {s} {a} {a}. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
fs)
            Step s a
Stop -> do
                b
b <- s -> m b
extract s
fs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b forall s fs b a. FoldMany s fs b a
FoldManyDone)
    step' State StreamK m a
_ (FoldManyYield b
b FoldMany s s b a
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b FoldMany s s b a
next
    step' State StreamK m a
_ FoldMany s s b a
FoldManyDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE groupsOf #-}
groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
groupsOf :: forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
groupsOf Int
n Fold m a b
f = forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
foldMany (forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
FL.take Int
n Fold m a b
f)

-- Keep the argument order consistent with refoldIterateM.

-- | Like 'foldMany' but for the 'Refold' type.  The supplied action is used as
-- the initial value for each refold.
--
-- /Internal/
{-# INLINE_NORMAL refoldMany #-}
refoldMany :: Monad m => Refold m x a b -> m x -> Stream m a -> Stream m b
refoldMany :: forall (m :: * -> *) x a b.
Monad m =>
Refold m x a b -> m x -> Stream m a -> Stream m b
refoldMany (Refold s -> a -> m (Step s b)
fstep x -> m (Step s b)
inject s -> m b
extract) m x
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a} {a}.
State StreamK m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' (forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
state)

    where

    {-# INLINE consume #-}
    consume :: a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs = do
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Done b
b -> forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
s)
                  FL.Partial s
ps -> forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
ps

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' State StreamK m a
_ (FoldManyStart s
st) = do
        Step s b
r <- m x
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= x -> m (Step s b)
inject
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                  FL.Done b
b -> forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
st)
                  FL.Partial s
fs -> forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
st
    step' State StreamK m a
gst (FoldManyFirst s
fs s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> forall {s} {a} {a}. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
s)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State StreamK m a
gst (FoldManyLoop s
st s
fs) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> forall {s} {a} {a}. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
fs)
            Step s a
Stop -> do
                b
b <- s -> m b
extract s
fs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b forall s fs b a. FoldMany s fs b a
FoldManyDone)
    step' State StreamK m a
_ (FoldManyYield b
b FoldMany s s b a
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b FoldMany s s b a
next
    step' State StreamK m a
_ FoldMany s s b a
FoldManyDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Stream with a cross product style monad instance
------------------------------------------------------------------------------

-- XXX CrossStream performs better than the CrossStreamK when nesting two
-- loops, however, CrossStreamK seems to be better for more than two nestings,
-- need to do more perf investigation.

-- | A newtype wrapper for the 'Stream' type with a cross product style monad
-- instance.
--
-- A 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- Stream.fold Fold.toList $ Stream.unCross $ do
--     x <- Stream.mkCross $ Stream.fromList [1,2]
--     -- Perform the following actions for each x in the stream
--     return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like nested @for@ loops:
--
-- >>> :{
-- Stream.fold Fold.toList $ Stream.unCross $ do
--     x <- Stream.mkCross $ Stream.fromList [1,2]
--     y <- Stream.mkCross $ Stream.fromList [3,4]
--     -- Perform the following actions for each x, for each y
--     return (x, y)
-- :}
-- [(1,3),(1,4),(2,3),(2,4)]
--
newtype CrossStream m a = CrossStream {forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCrossStream :: Stream m a}
        deriving (forall a b. a -> CrossStream m b -> CrossStream m a
forall a b. (a -> b) -> CrossStream m a -> CrossStream m b
forall (m :: * -> *) a b.
Monad m =>
a -> CrossStream m b -> CrossStream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> CrossStream m a -> CrossStream m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> CrossStream m b -> CrossStream m a
$c<$ :: forall (m :: * -> *) a b.
Monad m =>
a -> CrossStream m b -> CrossStream m a
fmap :: forall a b. (a -> b) -> CrossStream m a -> CrossStream m b
$cfmap :: forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> CrossStream m a -> CrossStream m b
Functor, forall a. Eq a => a -> CrossStream m a -> Bool
forall a. Num a => CrossStream m a -> a
forall a. Ord a => CrossStream m a -> a
forall m. Monoid m => CrossStream m m -> m
forall a. CrossStream m a -> Bool
forall a. CrossStream m a -> Int
forall a. CrossStream m a -> [a]
forall a. (a -> a -> a) -> CrossStream m a -> a
forall m a. Monoid m => (a -> m) -> CrossStream m a -> m
forall b a. (b -> a -> b) -> b -> CrossStream m a -> b
forall a b. (a -> b -> b) -> b -> CrossStream m a -> b
forall (m :: * -> *) a.
(Foldable m, Monad m, Eq a) =>
a -> CrossStream m a -> Bool
forall (m :: * -> *) a.
(Foldable m, Monad m, Num a) =>
CrossStream m a -> a
forall (m :: * -> *) a.
(Foldable m, Monad m, Ord a) =>
CrossStream m a -> a
forall (m :: * -> *) m.
(Foldable m, Monad m, Monoid m) =>
CrossStream m m -> m
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStream m a -> Bool
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStream m a -> Int
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStream m a -> [a]
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
(a -> a -> a) -> CrossStream m a -> a
forall (m :: * -> *) m a.
(Foldable m, Monad m, Monoid m) =>
(a -> m) -> CrossStream m a -> m
forall (m :: * -> *) b a.
(Foldable m, Monad m) =>
(b -> a -> b) -> b -> CrossStream m a -> b
forall (m :: * -> *) a b.
(Foldable m, Monad m) =>
(a -> b -> b) -> b -> CrossStream m a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
product :: forall a. Num a => CrossStream m a -> a
$cproduct :: forall (m :: * -> *) a.
(Foldable m, Monad m, Num a) =>
CrossStream m a -> a
sum :: forall a. Num a => CrossStream m a -> a
$csum :: forall (m :: * -> *) a.
(Foldable m, Monad m, Num a) =>
CrossStream m a -> a
minimum :: forall a. Ord a => CrossStream m a -> a
$cminimum :: forall (m :: * -> *) a.
(Foldable m, Monad m, Ord a) =>
CrossStream m a -> a
maximum :: forall a. Ord a => CrossStream m a -> a
$cmaximum :: forall (m :: * -> *) a.
(Foldable m, Monad m, Ord a) =>
CrossStream m a -> a
elem :: forall a. Eq a => a -> CrossStream m a -> Bool
$celem :: forall (m :: * -> *) a.
(Foldable m, Monad m, Eq a) =>
a -> CrossStream m a -> Bool
length :: forall a. CrossStream m a -> Int
$clength :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStream m a -> Int
null :: forall a. CrossStream m a -> Bool
$cnull :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStream m a -> Bool
toList :: forall a. CrossStream m a -> [a]
$ctoList :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStream m a -> [a]
foldl1 :: forall a. (a -> a -> a) -> CrossStream m a -> a
$cfoldl1 :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
(a -> a -> a) -> CrossStream m a -> a
foldr1 :: forall a. (a -> a -> a) -> CrossStream m a -> a
$cfoldr1 :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
(a -> a -> a) -> CrossStream m a -> a
foldl' :: forall b a. (b -> a -> b) -> b -> CrossStream m a -> b
$cfoldl' :: forall (m :: * -> *) b a.
(Foldable m, Monad m) =>
(b -> a -> b) -> b -> CrossStream m a -> b
foldl :: forall b a. (b -> a -> b) -> b -> CrossStream m a -> b
$cfoldl :: forall (m :: * -> *) b a.
(Foldable m, Monad m) =>
(b -> a -> b) -> b -> CrossStream m a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> CrossStream m a -> b
$cfoldr' :: forall (m :: * -> *) a b.
(Foldable m, Monad m) =>
(a -> b -> b) -> b -> CrossStream m a -> b
foldr :: forall a b. (a -> b -> b) -> b -> CrossStream m a -> b
$cfoldr :: forall (m :: * -> *) a b.
(Foldable m, Monad m) =>
(a -> b -> b) -> b -> CrossStream m a -> b
foldMap' :: forall m a. Monoid m => (a -> m) -> CrossStream m a -> m
$cfoldMap' :: forall (m :: * -> *) m a.
(Foldable m, Monad m, Monoid m) =>
(a -> m) -> CrossStream m a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> CrossStream m a -> m
$cfoldMap :: forall (m :: * -> *) m a.
(Foldable m, Monad m, Monoid m) =>
(a -> m) -> CrossStream m a -> m
fold :: forall m. Monoid m => CrossStream m m -> m
$cfold :: forall (m :: * -> *) m.
(Foldable m, Monad m, Monoid m) =>
CrossStream m m -> m
Foldable)

{-# INLINE mkCross #-}
mkCross :: Stream m a -> CrossStream m a
mkCross :: forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross = forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream

{-# INLINE unCross #-}
unCross :: CrossStream m a -> Stream m a
unCross :: forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross = forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCrossStream

-- Pure (Identity monad) stream instances
deriving instance IsList (CrossStream Identity a)
deriving instance (a ~ Char) => IsString (CrossStream Identity a)
deriving instance Eq a => Eq (CrossStream Identity a)
deriving instance Ord a => Ord (CrossStream Identity a)

-- Do not use automatic derivation for this to show as "fromList" rather than
-- "fromList Identity".
instance Show a => Show (CrossStream Identity a) where
    {-# INLINE show #-}
    show :: CrossStream Identity a -> [Char]
show (CrossStream Stream Identity a
xs) = forall a. Show a => a -> [Char]
show Stream Identity a
xs

instance Read a => Read (CrossStream Identity a) where
    {-# INLINE readPrec #-}
    readPrec :: ReadPrec (CrossStream Identity a)
readPrec = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream forall a. Read a => ReadPrec a
readPrec

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

-- Note: we need to define all the typeclass operations because we want to
-- INLINE them.
instance Monad m => Applicative (CrossStream m) where
    {-# INLINE pure #-}
    pure :: forall a. a -> CrossStream m a
pure a
x = forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream (forall (m :: * -> *) a. Applicative m => a -> Stream m a
fromPure a
x)

    {-# INLINE (<*>) #-}
    (CrossStream Stream m (a -> b)
s1) <*> :: forall a b.
CrossStream m (a -> b) -> CrossStream m a -> CrossStream m b
<*> (CrossStream Stream m a
s2) =
        forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream (forall (f :: * -> *) a b.
Functor f =>
Stream f (a -> b) -> Stream f a -> Stream f b
crossApply Stream m (a -> b)
s1 Stream m a
s2)

    {-# INLINE liftA2 #-}
    liftA2 :: forall a b c.
(a -> b -> c)
-> CrossStream m a -> CrossStream m b -> CrossStream m c
liftA2 a -> b -> c
f CrossStream m a
x = forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
(<*>) (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b -> c
f CrossStream m a
x)

    {-# INLINE (*>) #-}
    (CrossStream Stream m a
s1) *> :: forall a b. CrossStream m a -> CrossStream m b -> CrossStream m b
*> (CrossStream Stream m b
s2) =
        forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream (forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f b
crossApplySnd Stream m a
s1 Stream m b
s2)

    {-# INLINE (<*) #-}
    (CrossStream Stream m a
s1) <* :: forall a b. CrossStream m a -> CrossStream m b -> CrossStream m a
<* (CrossStream Stream m b
s2) =
        forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream (forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f a
crossApplyFst Stream m a
s1 Stream m b
s2)

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (CrossStream m) where
    return :: forall a. a -> CrossStream m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure

    -- Benchmarks better with StreamD bind and pure:
    -- toList, filterAllout, *>, *<, >> (~2x)
    --

    -- Benchmarks better with CPS bind and pure:
    -- Prime sieve (25x)
    -- n binds, breakAfterSome, filterAllIn, state transformer (~2x)
    --
    {-# INLINE (>>=) #-}
    >>= :: forall a b.
CrossStream m a -> (a -> CrossStream m b) -> CrossStream m b
(>>=) (CrossStream Stream m a
m) a -> CrossStream m b
f = forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream (forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
concatMap (forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCrossStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> CrossStream m b
f) Stream m a
m)

    {-# INLINE (>>) #-}
    >> :: forall a b. CrossStream m a -> CrossStream m b -> CrossStream m b
(>>) = forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
(*>)

------------------------------------------------------------------------------
-- Transformers
------------------------------------------------------------------------------

instance (MonadIO m) => MonadIO (CrossStream m) where
    liftIO :: forall a. IO a -> CrossStream m a
liftIO IO a
x = forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO a
x)

instance MonadTrans CrossStream where
    {-# INLINE lift #-}
    lift :: forall (m :: * -> *) a. Monad m => m a -> CrossStream m a
lift m a
x = forall (m :: * -> *) a. Stream m a -> CrossStream m a
CrossStream (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
fromEffect m a
x)

instance (MonadThrow m) => MonadThrow (CrossStream m) where
    throwM :: forall e a. Exception e => e -> CrossStream m a
throwM = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM