{-# LANGUAGE CPP #-}
{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.StreamK.Type
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
-- Continuation passing style (CPS) stream implementation. The symbol 'K' below
-- denotes a function as well as a Kontinuation.
--
module Streamly.Internal.Data.Stream.StreamK.Type
    (
    -- * StreamK type
      Stream
    , StreamK (..)

    -- * CrossStreamK type wrapper
    , CrossStreamK
    , unCross
    , mkCross

    -- * foldr/build Fusion
    , mkStream
    , foldStream
    , foldStreamShared
    , foldrM
    , foldrS
    , foldrSShared
    , foldrSM
    , build
    , buildS
    , buildM
    , buildSM
    , augmentS
    , augmentSM
    , unShare

    -- * Construction
    -- ** Primitives
    , fromStopK
    , fromYieldK
    , consK
    , cons
    , (.:)
    , consM
    , consMBy
    , nil
    , nilM

    -- ** Unfolding
    , unfoldr
    , unfoldrMWith
    , unfoldrM

    -- ** From Values
    , fromEffect
    , fromPure
    , repeat
    , repeatMWith
    , replicateMWith

    -- ** From Indices
    , fromIndicesMWith

    -- ** Iteration
    , iterateMWith

    -- ** From Containers
    , fromFoldable
    , fromFoldableM

    -- ** Cyclic
    , mfix

    -- * Elimination
    -- ** Primitives
    , uncons

    -- ** Strict Left Folds
    , Streamly.Internal.Data.Stream.StreamK.Type.foldl'
    , foldlx'

    -- ** Lazy Right Folds
    , Streamly.Internal.Data.Stream.StreamK.Type.foldr

    -- ** Specific Folds
    , drain
    , null
    , tail
    , init

    -- * Mapping
    , map
    , mapMWith
    , mapMSerial

    -- * Combining Two Streams
    -- ** Appending
    , conjoin
    , append

    -- ** Interleave
    , interleave
    , interleaveFst
    , interleaveMin

    -- ** Cross Product
    , crossApplyWith
    , crossApply
    , crossApplySnd
    , crossApplyFst
    , crossWith
    , cross

    -- * Concat
    , before
    , concatEffect
    , concatMapEffect
    , concatMapWith
    , concatMap
    , bindWith
    , concatIterateWith
    , concatIterateLeftsWith
    , concatIterateScanWith

    -- * Merge
    , mergeMapWith
    , mergeIterateWith

    -- * Buffered Operations
    , foldlS
    , reverse
    )
where

#include "inline.hs"

-- import Control.Applicative (liftA2)
import Control.Monad ((>=>))
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Applicative (liftA2)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Foldable (Foldable(foldl'), fold, foldr)
import Data.Function (fix)
import Data.Functor.Identity (Identity(..))
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
import GHC.Exts (IsList(..), IsString(..), oneShot)
import Streamly.Internal.BaseCompat ((#.))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.SVar.Type (State, adaptState, defState)
import Text.Read
       ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
       , readListPrecDefault)

import qualified Prelude

import Prelude hiding
    (map, mapM, concatMap, foldr, repeat, null, reverse, tail, init)

#include "DocTestDataStreamK.hs"

------------------------------------------------------------------------------
-- Basic stream type
------------------------------------------------------------------------------

-- It uses stop, singleton and yield continuations equivalent to the following
-- direct style type:
--
-- @
-- data StreamK m a = Stop | Singleton a | Yield a (StreamK m a)
-- @
--
-- To facilitate parallel composition we maintain a local state in an 'SVar'
-- that is shared across and is used for synchronization of the streams being
-- composed.
--
-- The singleton case can be expressed in terms of stop and yield but we have
-- it as a separate case to optimize composition operations for streams with
-- single element.  We build singleton streams in the implementation of 'pure'
-- for Applicative and Monad, and in 'lift' for MonadTrans.

-- XXX remove the State param.

-- | Continuation Passing Style (CPS) version of "Streamly.Data.Stream.Stream".
-- Unlike "Streamly.Data.Stream.Stream", 'StreamK' can be composed recursively
-- without affecting performance.
--
-- Semigroup instance appends two streams:
--
-- >>> (<>) = Stream.append
--
{-# DEPRECATED Stream "Please use StreamK instead." #-}
type Stream = StreamK

newtype StreamK m a =
    MkStream (forall r.
               State StreamK m a         -- state
            -> (a -> StreamK m a -> m r) -- yield
            -> (a -> m r)               -- singleton
            -> m r                      -- stop
            -> m r
            )

mkStream
    :: (forall r. State StreamK m a
        -> (a -> StreamK m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> StreamK m a
mkStream :: (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
MkStream

-- | A terminal function that has no continuation to follow.
type StopK m = forall r. m r -> m r

-- | A monadic continuation, it is a function that yields a value of type "a"
-- and calls the argument (a -> m r) as a continuation with that value. We can
-- also think of it as a callback with a handler (a -> m r).  Category
-- theorists call it a codensity type, a special type of right kan extension.
type YieldK m a = forall r. (a -> m r) -> m r

_wrapM :: Monad m => m a -> YieldK m a
_wrapM :: m a -> YieldK m a
_wrapM m a
m = (m a
m m a -> (a -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=)

-- | Make an empty stream from a stop function.
fromStopK :: StopK m -> StreamK m a
fromStopK :: StopK m -> StreamK m a
fromStopK StopK m
k = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
_ a -> m r
_ m r
stp -> m r -> m r
StopK m
k m r
stp

-- | Make a singleton stream from a callback function. The callback function
-- calls the one-shot yield continuation to yield an element.
fromYieldK :: YieldK m a -> StreamK m a
fromYieldK :: YieldK m a -> StreamK m a
fromYieldK YieldK m a
k = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
_ a -> m r
sng m r
_ -> (a -> m r) -> m r
YieldK m a
k a -> m r
sng

-- | Add a yield function at the head of the stream.
consK :: YieldK m a -> StreamK m a -> StreamK m a
consK :: YieldK m a -> StreamK m a -> StreamK m a
consK YieldK m a
k StreamK m a
r = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> (a -> m r) -> m r
YieldK m a
k (a -> StreamK m a -> m r
`yld` StreamK m a
r)

-- XXX Build a stream from a repeating callback function.

------------------------------------------------------------------------------
-- Construction
------------------------------------------------------------------------------

infixr 5 `cons`

-- faster than consM because there is no bind.

-- | A right associative prepend operation to add a pure value at the head of
-- an existing stream::
--
-- >>> s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
-- >>> Stream.fold Fold.toList (StreamK.toStream s)
-- [1,2,3]
--
-- It can be used efficiently with 'Prelude.foldr':
--
-- >>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
--
-- Same as the following but more efficient:
--
-- >>> cons x xs = return x `StreamK.consM` xs
--
{-# INLINE_NORMAL cons #-}
cons :: a -> StreamK m a -> StreamK m a
cons :: a -> StreamK m a -> StreamK m a
cons a
a StreamK m a
r = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
yield a -> m r
_ m r
_ -> a -> StreamK m a -> m r
yield a
a StreamK m a
r

infixr 5 .:

-- | Operator equivalent of 'cons'.
--
-- @
-- > toList $ 1 .: 2 .: 3 .: nil
-- [1,2,3]
-- @
--
{-# INLINE (.:) #-}
(.:) :: a -> StreamK m a -> StreamK m a
.: :: a -> StreamK m a -> StreamK m a
(.:) = a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons

-- | A stream that terminates without producing any output or side effect.
--
-- >>> Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
-- []
--
{-# INLINE_NORMAL nil #-}
nil :: StreamK m a
nil :: StreamK m a
nil = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
_ a -> m r
_ m r
stp -> m r
stp

-- | A stream that terminates without producing any output, but produces a side
-- effect.
--
-- >>> Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
-- "nil"
-- []
--
-- /Pre-release/
{-# INLINE_NORMAL nilM #-}
nilM :: Applicative m => m b -> StreamK m a
nilM :: m b -> StreamK m a
nilM m b
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
_ a -> m r
_ m r
stp -> m b
m m b -> m r -> m r
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> m r
stp

-- | Create a singleton stream from a pure value.
--
-- >>> fromPure a = a `StreamK.cons` StreamK.nil
-- >>> fromPure = pure
-- >>> fromPure = StreamK.fromEffect . pure
--
{-# INLINE_NORMAL fromPure #-}
fromPure :: a -> StreamK m a
fromPure :: a -> StreamK m a
fromPure a
a = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
_ a -> m r
single m r
_ -> a -> m r
single a
a

-- | Create a singleton stream from a monadic action.
--
-- >>> fromEffect m = m `StreamK.consM` StreamK.nil
--
-- >>> Stream.fold Fold.drain $ StreamK.toStream $ StreamK.fromEffect (putStrLn "hello")
-- hello
--
{-# INLINE_NORMAL fromEffect #-}
fromEffect :: Monad m => m a -> StreamK m a
fromEffect :: m a -> StreamK m a
fromEffect m a
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
_ a -> m r
single m r
_ -> m a
m m a -> (a -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m r
single

infixr 5 `consM`

-- NOTE: specializing the function outside the instance definition seems to
-- improve performance quite a bit at times, even if we have the same
-- SPECIALIZE in the instance definition.

-- | A right associative prepend operation to add an effectful value at the
-- head of an existing stream::
--
-- >>> s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
-- >>> Stream.fold Fold.drain (StreamK.toStream s)
-- hello
-- world
--
-- It can be used efficiently with 'Prelude.foldr':
--
-- >>> fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil
--
-- Same as the following but more efficient:
--
-- >>> consM x xs = StreamK.fromEffect x `StreamK.append` xs
--
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> StreamK IO a -> StreamK IO a #-}
consM :: Monad m => m a -> StreamK m a -> StreamK m a
consM :: m a -> StreamK m a -> StreamK m a
consM m a
m StreamK m a
r = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
_ a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> m a
m m a -> (a -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (a -> StreamK m a -> m r
`yld` StreamK m a
r)

-- XXX specialize to IO?
{-# INLINE consMBy #-}
consMBy :: Monad m =>
    (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a
consMBy :: (StreamK m a -> StreamK m a -> StreamK m a)
-> m a -> StreamK m a -> StreamK m a
consMBy StreamK m a -> StreamK m a -> StreamK m a
f m a
m StreamK m a
r = m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
fromEffect m a
m StreamK m a -> StreamK m a -> StreamK m a
`f` StreamK m a
r

------------------------------------------------------------------------------
-- Folding a stream
------------------------------------------------------------------------------

-- | Fold a stream by providing an SVar, a stop continuation, a singleton
-- continuation and a yield continuation. The stream would share the current
-- SVar passed via the State.
{-# INLINE_EARLY foldStreamShared #-}
foldStreamShared
    :: State StreamK m a
    -> (a -> StreamK m a -> m r)
    -> (a -> m r)
    -> m r
    -> StreamK m a
    -> m r
foldStreamShared :: State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
s a -> StreamK m a -> m r
yield a -> m r
single m r
stop (MkStream forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k) = State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k State StreamK m a
s a -> StreamK m a -> m r
yield a -> m r
single m r
stop

-- | Fold a stream by providing a State, stop continuation, a singleton
-- continuation and a yield continuation. The stream will not use the SVar
-- passed via State.
{-# INLINE foldStream #-}
foldStream
    :: State StreamK m a
    -> (a -> StreamK m a -> m r)
    -> (a -> m r)
    -> m r
    -> StreamK m a
    -> m r
foldStream :: State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
s a -> StreamK m a -> m r
yield a -> m r
single m r
stop (MkStream forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k) =
    State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
s) a -> StreamK m a -> m r
yield a -> m r
single m r
stop

-------------------------------------------------------------------------------
-- foldr/build fusion
-------------------------------------------------------------------------------

-- XXX perhaps we can just use foldrSM/buildM everywhere as they are more
-- general and cover foldrS/buildS as well.

-- | The function 'f' decides how to reconstruct the stream. We could
-- reconstruct using a shared state (SVar) or without sharing the state.
--
{-# INLINE foldrSWith #-}
foldrSWith ::
    (forall r. State StreamK m b
        -> (b -> StreamK m b -> m r)
        -> (b -> m r)
        -> m r
        -> StreamK m b
        -> m r)
    -> (a -> StreamK m b -> StreamK m b)
    -> StreamK m b
    -> StreamK m a
    -> StreamK m b
foldrSWith :: (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
foldrSWith forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
f a -> StreamK m b -> StreamK m b
step StreamK m b
final StreamK m a
m = StreamK m a -> StreamK m b
go StreamK m a
m
    where
    go :: StreamK m a -> StreamK m b
go StreamK m a
m1 = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
        let run :: StreamK m b -> m r
run StreamK m b
x = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
f State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp StreamK m b
x
            stop :: m r
stop = StreamK m b -> m r
run StreamK m b
final
            single :: a -> m r
single a
a = StreamK m b -> m r
run (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m b -> StreamK m b
step a
a StreamK m b
final
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m b -> m r
run (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m b -> StreamK m b
step a
a (StreamK m a -> StreamK m b
go StreamK m a
r)
         -- XXX if type a and b are the same we do not need adaptState, can we
         -- save some perf with that?
         -- XXX since we are using adaptState anyway here we can use
         -- foldStreamShared instead, will that save some perf?
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1

-- XXX we can use rewrite rules just for foldrSWith, if the function f is the
-- same we can rewrite it.

-- | Fold sharing the SVar state within the reconstructed stream
{-# INLINE_NORMAL foldrSShared #-}
foldrSShared ::
       (a -> StreamK m b -> StreamK m b)
    -> StreamK m b
    -> StreamK m a
    -> StreamK m b
foldrSShared :: (a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrSShared = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
forall (m :: * -> *) b a.
(forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
foldrSWith forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared

-- XXX consM is a typeclass method, therefore rewritten already. Instead maybe
-- we can make consM polymorphic using rewrite rules.
-- {-# RULES "foldrSShared/id"     foldrSShared consM nil = \x -> x #-}
{-# RULES "foldrSShared/nil"
    forall k z. foldrSShared k z nil = z #-}
{-# RULES "foldrSShared/single"
    forall k z x. foldrSShared k z (fromPure x) = k x z #-}
-- {-# RULES "foldrSShared/app" [1]
--     forall ys. foldrSShared consM ys = \xs -> xs `conjoin` ys #-}

-- | Right fold to a streaming monad.
--
-- > foldrS StreamK.cons StreamK.nil === id
--
-- 'foldrS' can be used to perform stateless stream to stream transformations
-- like map and filter in general. It can be coupled with a scan to perform
-- stateful transformations. However, note that the custom map and filter
-- routines can be much more efficient than this due to better stream fusion.
--
-- >>> input = StreamK.fromStream $ Stream.fromList [1..5]
-- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS StreamK.cons StreamK.nil input
-- [1,2,3,4,5]
--
-- Find if any element in the stream is 'True':
--
-- >>> step x xs = if odd x then StreamK.fromPure True else xs
-- >>> input = StreamK.fromStream (Stream.fromList (2:4:5:undefined)) :: StreamK IO Int
-- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step (StreamK.fromPure False) input
-- [True]
--
-- Map (+2) on odd elements and filter out the even elements:
--
-- >>> step x xs = if odd x then (x + 2) `StreamK.cons` xs else xs
-- >>> input = StreamK.fromStream (Stream.fromList [1..5]) :: StreamK IO Int
-- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step StreamK.nil input
-- [3,5,7]
--
-- /Pre-release/
{-# INLINE_NORMAL foldrS #-}
foldrS ::
       (a -> StreamK m b -> StreamK m b)
    -> StreamK m b
    -> StreamK m a
    -> StreamK m b
foldrS :: (a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrS = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
forall (m :: * -> *) b a.
(forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
foldrSWith forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream

{-# RULES "foldrS/id"     foldrS cons nil = \x -> x #-}
{-# RULES "foldrS/nil"    forall k z.   foldrS k z nil  = z #-}
-- See notes in GHC.Base about this rule
-- {-# RULES "foldr/cons"
--  forall k z x xs. foldrS k z (x `cons` xs) = k x (foldrS k z xs) #-}
{-# RULES "foldrS/single" forall k z x. foldrS k z (fromPure x) = k x z #-}
-- {-# RULES "foldrS/app" [1]
--  forall ys. foldrS cons ys = \xs -> xs `conjoin` ys #-}

-------------------------------------------------------------------------------
-- foldrS with monadic cons i.e. consM
-------------------------------------------------------------------------------

{-# INLINE foldrSMWith #-}
foldrSMWith :: Monad m
    => (forall r. State StreamK m b
        -> (b -> StreamK m b -> m r)
        -> (b -> m r)
        -> m r
        -> StreamK m b
        -> m r)
    -> (m a -> StreamK m b -> StreamK m b)
    -> StreamK m b
    -> StreamK m a
    -> StreamK m b
foldrSMWith :: (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (m a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
foldrSMWith forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
f m a -> StreamK m b -> StreamK m b
step StreamK m b
final StreamK m a
m = StreamK m a -> StreamK m b
go StreamK m a
m
    where
    go :: StreamK m a -> StreamK m b
go StreamK m a
m1 = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
        let run :: StreamK m b -> m r
run StreamK m b
x = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
f State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp StreamK m b
x
            stop :: m r
stop = StreamK m b -> m r
run StreamK m b
final
            single :: a -> m r
single a
a = StreamK m b -> m r
run (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ m a -> StreamK m b -> StreamK m b
step (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a) StreamK m b
final
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m b -> m r
run (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ m a -> StreamK m b -> StreamK m b
step (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a) (StreamK m a -> StreamK m b
go StreamK m a
r)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1

{-# INLINE_NORMAL foldrSM #-}
foldrSM :: Monad m
    => (m a -> StreamK m b -> StreamK m b)
    -> StreamK m b
    -> StreamK m a
    -> StreamK m b
foldrSM :: (m a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrSM = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (m a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (m a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
foldrSMWith forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream

-- {-# RULES "foldrSM/id"     foldrSM consM nil = \x -> x #-}
{-# RULES "foldrSM/nil"    forall k z.   foldrSM k z nil  = z #-}
{-# RULES "foldrSM/single" forall k z x. foldrSM k z (fromEffect x) = k x z #-}
-- {-# RULES "foldrSM/app" [1]
--  forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}

-- Like foldrSM but sharing the SVar state within the recostructed stream.
{-# INLINE_NORMAL foldrSMShared #-}
foldrSMShared :: Monad m
    => (m a -> StreamK m b -> StreamK m b)
    -> StreamK m b
    -> StreamK m a
    -> StreamK m b
foldrSMShared :: (m a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrSMShared = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (m a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
forall (m :: * -> *) b a.
Monad m =>
(forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r)
 -> (b -> m r)
 -> m r
 -> StreamK m b
 -> m r)
-> (m a -> StreamK m b -> StreamK m b)
-> StreamK m b
-> StreamK m a
-> StreamK m b
foldrSMWith forall r.
State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared

-- {-# RULES "foldrSM/id"     foldrSM consM nil = \x -> x #-}
{-# RULES "foldrSMShared/nil"
    forall k z. foldrSMShared k z nil = z #-}
{-# RULES "foldrSMShared/single"
    forall k z x. foldrSMShared k z (fromEffect x) = k x z #-}
-- {-# RULES "foldrSM/app" [1]
--  forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}

-------------------------------------------------------------------------------
-- build
-------------------------------------------------------------------------------

{-# INLINE_NORMAL build #-}
build :: forall m a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a
build :: (forall b. (a -> b -> b) -> b -> b) -> StreamK m a
build forall b. (a -> b -> b) -> b -> b
g = (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
forall b. (a -> b -> b) -> b -> b
g a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons StreamK m a
forall (m :: * -> *) a. StreamK m a
nil

{-# RULES "foldrM/build"
    forall k z (g :: forall b. (a -> b -> b) -> b -> b).
    foldrM k z (build g) = g k z #-}

{-# RULES "foldrS/build"
      forall k z (g :: forall b. (a -> b -> b) -> b -> b).
      foldrS k z (build g) = g k z #-}

{-# RULES "foldrS/cons/build"
      forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
      foldrS k z (x `cons` build g) = k x (g k z) #-}

{-# RULES "foldrSShared/build"
      forall k z (g :: forall b. (a -> b -> b) -> b -> b).
      foldrSShared k z (build g) = g k z #-}

{-# RULES "foldrSShared/cons/build"
      forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
      foldrSShared k z (x `cons` build g) = k x (g k z) #-}

-- build a stream by applying cons and nil to a build function
{-# INLINE_NORMAL buildS #-}
buildS ::
       ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
    -> StreamK m a
buildS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a
buildS (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g = (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons StreamK m a
forall (m :: * -> *) a. StreamK m a
nil

{-# RULES "foldrS/buildS"
      forall k z
        (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
      foldrS k z (buildS g) = g k z #-}

{-# RULES "foldrS/cons/buildS"
      forall k z x
        (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
      foldrS k z (x `cons` buildS g) = k x (g k z) #-}

{-# RULES "foldrSShared/buildS"
      forall k z
        (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
      foldrSShared k z (buildS g) = g k z #-}

{-# RULES "foldrSShared/cons/buildS"
      forall k z x
        (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
      foldrSShared k z (x `cons` buildS g) = k x (g k z) #-}

-- build a stream by applying consM and nil to a build function
{-# INLINE_NORMAL buildSM #-}
buildSM :: Monad m
    => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
    -> StreamK m a
buildSM :: ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a
buildSM (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g = (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM StreamK m a
forall (m :: * -> *) a. StreamK m a
nil

{-# RULES "foldrSM/buildSM"
     forall k z
        (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
     foldrSM k z (buildSM g) = g k z #-}

{-# RULES "foldrSMShared/buildSM"
     forall k z
        (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
     foldrSMShared k z (buildSM g) = g k z #-}

-- Disabled because this may not fire as consM is a class Op
{-
{-# RULES "foldrS/consM/buildSM"
      forall k z x (g :: (m a -> t m a -> t m a) -> t m a -> t m a)
    . foldrSM k z (x `consM` buildSM g)
    = k x (g k z)
#-}
-}

-- Build using monadic build functions (continuations) instead of
-- reconstructing a stream.
{-# INLINE_NORMAL buildM #-}
buildM :: Monad m
    => (forall r. (a -> StreamK m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )
    -> StreamK m a
buildM :: (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
buildM forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
g = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
g (\a
a StreamK m a
r -> State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` StreamK m a
r)) a -> m r
sng m r
stp

-- | Like 'buildM' but shares the SVar state across computations.
{-# INLINE_NORMAL sharedMWith #-}
sharedMWith :: Monad m
    => (m a -> StreamK m a -> StreamK m a)
    -> (forall r. (a -> StreamK m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )
    -> StreamK m a
sharedMWith :: (m a -> StreamK m a -> StreamK m a)
-> (forall r.
    (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
sharedMWith m a -> StreamK m a -> StreamK m a
cns forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
g = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
g (\a
a StreamK m a
r -> State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a m a -> StreamK m a -> StreamK m a
`cns` StreamK m a
r)) a -> m r
sng m r
stp

-------------------------------------------------------------------------------
-- augment
-------------------------------------------------------------------------------

{-# INLINE_NORMAL augmentS #-}
augmentS ::
       ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
    -> StreamK m a
    -> StreamK m a
augmentS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a -> StreamK m a
augmentS (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g StreamK m a
xs = (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons StreamK m a
xs

{-# RULES "augmentS/nil"
    forall (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
    augmentS g nil = buildS g
    #-}

{-# RULES "foldrS/augmentS"
    forall k z xs
        (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
    foldrS k z (augmentS g xs) = g k (foldrS k z xs)
    #-}

{-# RULES "augmentS/buildS"
    forall (g :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
           (h :: (a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
    augmentS g (buildS h) = buildS (\c n -> g c (h c n))
    #-}

{-# INLINE_NORMAL augmentSM #-}
augmentSM :: Monad m =>
       ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
    -> StreamK m a -> StreamK m a
augmentSM :: ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a -> StreamK m a
augmentSM (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g StreamK m a
xs = (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
g m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM StreamK m a
xs

{-# RULES "augmentSM/nil"
    forall
        (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
    augmentSM g nil = buildSM g
    #-}

{-# RULES "foldrSM/augmentSM"
    forall k z xs
        (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
    foldrSM k z (augmentSM g xs) = g k (foldrSM k z xs)
    #-}

{-# RULES "augmentSM/buildSM"
    forall
        (g :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
        (h :: (m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a).
    augmentSM g (buildSM h) = buildSM (\c n -> g c (h c n))
    #-}

-------------------------------------------------------------------------------
-- Experimental foldrM/buildM
-------------------------------------------------------------------------------

-- | Lazy right fold with a monadic step function.
{-# INLINE_NORMAL foldrM #-}
foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b
foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b
foldrM a -> m b -> m b
step m b
acc StreamK m a
m = StreamK m a -> m b
go StreamK m a
m
    where
    go :: StreamK m a -> m b
go StreamK m a
m1 =
        let stop :: m b
stop = m b
acc
            single :: a -> m b
single a
a = a -> m b -> m b
step a
a m b
acc
            yieldk :: a -> StreamK m a -> m b
yieldk a
a StreamK m a
r = a -> m b -> m b
step a
a (StreamK m a -> m b
go StreamK m a
r)
        in State StreamK m a
-> (a -> StreamK m a -> m b)
-> (a -> m b)
-> m b
-> StreamK m a
-> m b
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m b
yieldk a -> m b
single m b
stop StreamK m a
m1

{-# INLINE_NORMAL foldrMKWith #-}
foldrMKWith
    :: (State StreamK m a
        -> (a -> StreamK m a -> m b)
        -> (a -> m b)
        -> m b
        -> StreamK m a
        -> m b)
    -> (a -> m b -> m b)
    -> m b
    -> ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b)
    -> m b
foldrMKWith :: (State StreamK m a
 -> (a -> StreamK m a -> m b)
 -> (a -> m b)
 -> m b
 -> StreamK m a
 -> m b)
-> (a -> m b -> m b)
-> m b
-> ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b)
-> m b
foldrMKWith State StreamK m a
-> (a -> StreamK m a -> m b)
-> (a -> m b)
-> m b
-> StreamK m a
-> m b
f a -> m b -> m b
step m b
acc = ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b
go
    where
    go :: ((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b
go (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b
k =
        let stop :: m b
stop = m b
acc
            single :: a -> m b
single a
a = a -> m b -> m b
step a
a m b
acc
            yieldk :: a -> StreamK m a -> m b
yieldk a
a StreamK m a
r = a -> m b -> m b
step a
a (((a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b) -> m b
go (\a -> StreamK m a -> m b
yld a -> m b
sng m b
stp -> State StreamK m a
-> (a -> StreamK m a -> m b)
-> (a -> m b)
-> m b
-> StreamK m a
-> m b
f State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m b
yld a -> m b
sng m b
stp StreamK m a
r))
        in (a -> StreamK m a -> m b) -> (a -> m b) -> m b -> m b
k a -> StreamK m a -> m b
yieldk a -> m b
single m b
stop

{-
{-# RULES "foldrM/buildS"
      forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a)
    . foldrM k z (buildS g)
    = g k z
#-}
-}
-- XXX in which case will foldrM/buildM fusion be useful?
{-# RULES "foldrM/buildM"
    forall step acc (g :: (forall r.
           (a -> StreamK m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )).
    foldrM step acc (buildM g) = foldrMKWith foldStream step acc g
    #-}

{-
{-# RULES "foldrM/sharedM"
    forall step acc (g :: (forall r.
           (a -> StreamK m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )).
    foldrM step acc (sharedM g) = foldrMKWith foldStreamShared step acc g
    #-}
-}

------------------------------------------------------------------------------
-- Left fold
------------------------------------------------------------------------------

-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
-- argument) to the folded value at the end. This is designed to work with the
-- @foldl@ library. The suffix @x@ is a mnemonic for extraction.
--
-- Note that the accumulator is always evaluated including the initial value.
{-# INLINE foldlx' #-}
foldlx' :: forall m a b x. Monad m
    => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b
foldlx' :: (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b
foldlx' x -> a -> x
step x
begin x -> b
done StreamK m a
m = StreamK m x -> m b
get (StreamK m x -> m b) -> StreamK m x -> m b
forall a b. (a -> b) -> a -> b
$ StreamK m a -> x -> StreamK m x
go StreamK m a
m x
begin
    where
    {-# NOINLINE get #-}
    get :: StreamK m x -> m b
    get :: StreamK m x -> m b
get StreamK m x
m1 =
        -- XXX we are not strictly evaluating the accumulator here. Is this
        -- okay?
        let single :: x -> m b
single = b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (x -> b) -> x -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done
        -- XXX this is foldSingleton. why foldStreamShared?
         in State StreamK m x
-> (x -> StreamK m x -> m b)
-> (x -> m b)
-> m b
-> StreamK m x
-> m b
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m x
forall a. HasCallStack => a
undefined x -> StreamK m x -> m b
forall a. HasCallStack => a
undefined x -> m b
single m b
forall a. HasCallStack => a
undefined StreamK m x
m1

    -- Note, this can be implemented by making a recursive call to "go",
    -- however that is more expensive because of unnecessary recursion
    -- that cannot be tail call optimized. Unfolding recursion explicitly via
    -- continuations is much more efficient.
    go :: StreamK m a -> x -> StreamK m x
    go :: StreamK m a -> x -> StreamK m x
go StreamK m a
m1 !x
acc = (forall r.
 State StreamK m x
 -> (x -> StreamK m x -> m r) -> (x -> m r) -> m r -> m r)
-> StreamK m x
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m x
  -> (x -> StreamK m x -> m r) -> (x -> m r) -> m r -> m r)
 -> StreamK m x)
-> (forall r.
    State StreamK m x
    -> (x -> StreamK m x -> m r) -> (x -> m r) -> m r -> m r)
-> StreamK m x
forall a b. (a -> b) -> a -> b
$ \State StreamK m x
_ x -> StreamK m x -> m r
yld x -> m r
sng m r
_ ->
        let stop :: m r
stop = x -> m r
sng x
acc
            single :: a -> m r
single a
a = x -> m r
sng (x -> m r) -> x -> m r
forall a b. (a -> b) -> a -> b
$ x -> a -> x
step x
acc a
a
            -- XXX this is foldNonEmptyStream
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = State StreamK m x
-> (x -> StreamK m x -> m r)
-> (x -> m r)
-> m r
-> StreamK m x
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState x -> StreamK m x -> m r
yld x -> m r
sng m r
forall a. HasCallStack => a
undefined (StreamK m x -> m r) -> StreamK m x -> m r
forall a b. (a -> b) -> a -> b
$
                StreamK m a -> x -> StreamK m x
go StreamK m a
r (x -> a -> x
step x
acc a
a)
        in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1

-- | Strict left associative fold.
{-# INLINE foldl' #-}
foldl' :: Monad m => (b -> a -> b) -> b -> StreamK m a -> m b
foldl' :: (b -> a -> b) -> b -> StreamK m a -> m b
foldl' b -> a -> b
step b
begin = (b -> a -> b) -> b -> (b -> b) -> StreamK m a -> m b
forall (m :: * -> *) a b x.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b
foldlx' b -> a -> b
step b
begin b -> b
forall a. a -> a
id

------------------------------------------------------------------------------
-- Specialized folds
------------------------------------------------------------------------------

-- XXX use foldrM to implement folds where possible
-- XXX This (commented) definition of drain and mapM_ perform much better on
-- some benchmarks but worse on others. Need to investigate why, may there is
-- an optimization opportunity that we can exploit.
-- drain = foldrM (\_ xs -> return () >> xs) (return ())

-- |
-- > drain = foldl' (\_ _ -> ()) ()
-- > drain = mapM_ (\_ -> return ())
{-# INLINE drain #-}
drain :: Monad m => StreamK m a -> m ()
drain :: StreamK m a -> m ()
drain = (a -> m () -> m ()) -> m () -> StreamK m a -> m ()
forall a (m :: * -> *) b.
(a -> m b -> m b) -> m b -> StreamK m a -> m b
foldrM (\a
_ m ()
xs -> m ()
xs) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-
drain = go
    where
    go m1 =
        let stop = return ()
            single _ = return ()
            yieldk _ r = go r
         in foldStream defState yieldk single stop m1
-}

{-# INLINE null #-}
null :: Monad m => StreamK m a -> m Bool
-- null = foldrM (\_ _ -> return True) (return False)
null :: StreamK m a -> m Bool
null StreamK m a
m =
    let stop :: m Bool
stop      = Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        single :: p -> m Bool
single p
_  = Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        yieldk :: p -> p -> m Bool
yieldk p
_ p
_ = Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    in State StreamK m a
-> (a -> StreamK m a -> m Bool)
-> (a -> m Bool)
-> m Bool
-> StreamK m a
-> m Bool
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
forall (m :: * -> *) p p. Monad m => p -> p -> m Bool
yieldk a -> m Bool
forall (m :: * -> *) p. Monad m => p -> m Bool
single m Bool
stop StreamK m a
m

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

infixr 6 `append`

-- | Appends two streams sequentially, yielding all elements from the first
-- stream, and then all elements from the second stream.
--
-- >>> s1 = StreamK.fromStream $ Stream.fromList [1,2]
-- >>> s2 = StreamK.fromStream $ Stream.fromList [3,4]
-- >>> Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
-- [1,2,3,4]
--
-- This has O(n) append performance where @n@ is the number of streams. It can
-- be used to efficiently fold an infinite lazy container of streams using
-- 'concatMapWith' et. al.
--
{-# INLINE append #-}
append :: StreamK m a -> StreamK m a -> StreamK m a
-- XXX This doubles the time of toNullAp benchmark, may not be fusing properly
-- serial xs ys = augmentS (\c n -> foldrS c n xs) ys
append :: StreamK m a -> StreamK m a -> StreamK m a
append StreamK m a
m1 StreamK m a
m2 = StreamK m a -> StreamK m a
go StreamK m a
m1
    where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
               let stop :: m r
stop       = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
                   single :: a -> m r
single a
a   = a -> StreamK m a -> m r
yld a
a StreamK m a
m2
                   yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
               in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m

-- join/merge/append streams depending on consM
{-# INLINE conjoin #-}
conjoin :: Monad m => StreamK m a -> StreamK m a -> StreamK m a
conjoin :: StreamK m a -> StreamK m a -> StreamK m a
conjoin StreamK m a
xs = ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a -> StreamK m a
augmentSM (\m a -> StreamK m a -> StreamK m a
c StreamK m a
n -> (m a -> StreamK m a -> StreamK m a)
-> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a b.
Monad m =>
(m a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrSM m a -> StreamK m a -> StreamK m a
c StreamK m a
n StreamK m a
xs)

instance Semigroup (StreamK m a) where
    <> :: StreamK m a -> StreamK m a -> StreamK m a
(<>) = StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance Monoid (StreamK m a) where
    mempty :: StreamK m a
mempty = StreamK m a
forall (m :: * -> *) a. StreamK m a
nil
    mappend :: StreamK m a -> StreamK m a -> StreamK m a
mappend = StreamK m a -> StreamK m a -> StreamK m a
forall a. Semigroup a => a -> a -> a
(<>)

-------------------------------------------------------------------------------
-- Functor
-------------------------------------------------------------------------------

-- IMPORTANT: This is eta expanded on purpose. This should not be eta
-- reduced. This will cause a lot of regressions, probably because of some
-- rewrite rules. Ideally don't run hlint on this file.
{-# INLINE_LATE mapFB #-}
mapFB :: forall b m a.
       (b -> StreamK m b -> StreamK m b)
    -> (a -> b)
    -> a
    -> StreamK m b
    -> StreamK m b
mapFB :: (b -> StreamK m b -> StreamK m b)
-> (a -> b) -> a -> StreamK m b -> StreamK m b
mapFB b -> StreamK m b -> StreamK m b
c a -> b
f = \a
x StreamK m b
ys -> b -> StreamK m b -> StreamK m b
c (a -> b
f a
x) StreamK m b
ys

{-# RULES
"mapFB/mapFB" forall c f g. mapFB (mapFB c f) g = mapFB c (f . g)
"mapFB/id"    forall c.     mapFB c (\x -> x)   = c
    #-}

{-# INLINE map #-}
map :: (a -> b) -> StreamK m a -> StreamK m b
map :: (a -> b) -> StreamK m a -> StreamK m b
map a -> b
f StreamK m a
xs = ((b -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m b)
-> StreamK m b
forall a (m :: * -> *).
((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a
buildS (\b -> StreamK m b -> StreamK m b
c StreamK m b
n -> (a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
forall a (m :: * -> *) b.
(a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrS ((b -> StreamK m b -> StreamK m b)
-> (a -> b) -> a -> StreamK m b -> StreamK m b
forall b (m :: * -> *) a.
(b -> StreamK m b -> StreamK m b)
-> (a -> b) -> a -> StreamK m b -> StreamK m b
mapFB b -> StreamK m b -> StreamK m b
c a -> b
f) StreamK m b
n StreamK m a
xs)

-- XXX This definition might potentially be more efficient, but the cost in the
-- benchmark is dominated by unfoldrM cost so we cannot correctly determine
-- differences in the mapping cost. We should perhaps deduct the cost of
-- unfoldrM from the benchmarks and then compare.
{-
map f m = go m
    where
        go m1 =
            mkStream $ \st yld sng stp ->
            let single     = sng . f
                yieldk a r = yld (f a) (go r)
            in foldStream (adaptState st) yieldk single stp m1
-}

{-# INLINE_LATE mapMFB #-}
mapMFB :: Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
mapMFB :: (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
mapMFB m b -> t m b -> t m b
c a -> m b
f m a
x = m b -> t m b -> t m b
c (m a
x m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m b
f)

{-# RULES
    "mapMFB/mapMFB" forall c f g. mapMFB (mapMFB c f) g = mapMFB c (f >=> g)
    #-}
-- XXX These rules may never fire because pure/return type class rules will
-- fire first.
{-
"mapMFB/pure"    forall c.     mapMFB c (\x -> pure x)   = c
"mapMFB/return"  forall c.     mapMFB c (\x -> return x) = c
-}

-- This is experimental serial version supporting fusion.
--
-- XXX what if we do not want to fuse two concurrent mapMs?
-- XXX we can combine two concurrent mapM only if the SVar is of the same type
-- So for now we use it only for serial streams.
-- XXX fusion would be easier for monomoprhic stream types.
-- {-# RULES "mapM serial" mapM = mapMSerial #-}
{-# INLINE mapMSerial #-}
mapMSerial :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
mapMSerial :: (a -> m b) -> StreamK m a -> StreamK m b
mapMSerial a -> m b
f StreamK m a
xs = ((m b -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m b)
-> StreamK m b
forall (m :: * -> *) a.
Monad m =>
((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a)
-> StreamK m a
buildSM (\m b -> StreamK m b -> StreamK m b
c StreamK m b
n -> (m a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
Monad m =>
(m a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrSMShared ((m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> m a -> StreamK m b -> StreamK m b
forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
Monad m =>
(m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
mapMFB m b -> StreamK m b -> StreamK m b
c a -> m b
f) StreamK m b
n StreamK m a
xs)

{-# INLINE mapMWith #-}
mapMWith ::
       (m b -> StreamK m b -> StreamK m b)
    -> (a -> m b)
    -> StreamK m a
    -> StreamK m b
mapMWith :: (m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> StreamK m a -> StreamK m b
mapMWith m b -> StreamK m b -> StreamK m b
cns a -> m b
f = (a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
forall a (m :: * -> *) b.
(a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldrSShared (\a
x StreamK m b
xs -> a -> m b
f a
x m b -> StreamK m b -> StreamK m b
`cns` StreamK m b
xs) StreamK m b
forall (m :: * -> *) a. StreamK m a
nil

{-
-- See note under map definition above.
mapMWith cns f = go
    where
    go m1 = mkStream $ \st yld sng stp ->
        let single a  = f a >>= sng
            yieldk a r = foldStreamShared st yld sng stp $ f a `cns` go r
         in foldStream (adaptState st) yieldk single stp m1
-}

-- XXX in fact use the Stream type everywhere and only use polymorphism in the
-- high level modules/prelude.
instance Monad m => Functor (StreamK m) where
    fmap :: (a -> b) -> StreamK m a -> StreamK m b
fmap = (a -> b) -> StreamK m a -> StreamK m b
forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b
map

------------------------------------------------------------------------------
-- Lists
------------------------------------------------------------------------------

-- Serial streams can act like regular lists using the Identity monad

-- XXX Show instance is 10x slower compared to read, we can do much better.
-- The list show instance itself is really slow.

-- XXX The default definitions of "<" in the Ord instance etc. do not perform
-- well, because they do not get inlined. Need to add INLINE in Ord class in
-- base?

instance IsList (StreamK Identity a) where
    type (Item (StreamK Identity a)) = a

    {-# INLINE fromList #-}
    fromList :: [Item (StreamK Identity a)] -> StreamK Identity a
fromList = [Item (StreamK Identity a)] -> StreamK Identity a
forall (f :: * -> *) a (m :: * -> *).
Foldable f =>
f a -> StreamK m a
fromFoldable

    {-# INLINE toList #-}
    toList :: StreamK Identity a -> [Item (StreamK Identity a)]
toList = (a -> [a] -> [a]) -> [a] -> StreamK Identity a -> [a]
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Data.Foldable.foldr (:) []

-- XXX Fix these
{-
instance Eq a => Eq (StreamK Identity a) where
    {-# INLINE (==) #-}
    (==) xs ys = runIdentity $ eqBy (==) xs ys

instance Ord a => Ord (StreamK Identity a) where
    {-# INLINE compare #-}
    compare xs ys = runIdentity $ cmpBy compare xs ys

    {-# INLINE (<) #-}
    x < y =
        case compare x y of
            LT -> True
            _ -> False

    {-# INLINE (<=) #-}
    x <= y =
        case compare x y of
            GT -> False
            _ -> True

    {-# INLINE (>) #-}
    x > y =
        case compare x y of
            GT -> True
            _ -> False

    {-# INLINE (>=) #-}
    x >= y =
        case compare x y of
            LT -> False
            _ -> True

    {-# INLINE max #-}
    max x y = if x <= y then y else x

    {-# INLINE min #-}
    min x y = if x <= y then x else y
-}

instance Show a => Show (StreamK Identity a) where
    showsPrec :: Int -> StreamK Identity a -> ShowS
showsPrec Int
p StreamK Identity a
dl = Bool -> ShowS -> ShowS
showParen (Int
p Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
10) (ShowS -> ShowS) -> ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$
        String -> ShowS
showString String
"fromList " ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> ShowS
forall a. Show a => a -> ShowS
shows (StreamK Identity a -> [Item (StreamK Identity a)]
forall l. IsList l => l -> [Item l]
toList StreamK Identity a
dl)

instance Read a => Read (StreamK Identity a) where
    readPrec :: ReadPrec (StreamK Identity a)
readPrec = ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a)
forall a. ReadPrec a -> ReadPrec a
parens (ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a))
-> ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a)
forall a b. (a -> b) -> a -> b
$ Int
-> ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a)
forall a. Int -> ReadPrec a -> ReadPrec a
prec Int
10 (ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a))
-> ReadPrec (StreamK Identity a) -> ReadPrec (StreamK Identity a)
forall a b. (a -> b) -> a -> b
$ do
        Ident String
"fromList" <- ReadPrec Lexeme
lexP
        [a] -> StreamK Identity a
forall l. IsList l => [Item l] -> l
fromList ([a] -> StreamK Identity a)
-> ReadPrec [a] -> ReadPrec (StreamK Identity a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReadPrec [a]
forall a. Read a => ReadPrec a
readPrec

    readListPrec :: ReadPrec [StreamK Identity a]
readListPrec = ReadPrec [StreamK Identity a]
forall a. Read a => ReadPrec [a]
readListPrecDefault

instance (a ~ Char) => IsString (StreamK Identity a) where
    {-# INLINE fromString #-}
    fromString :: String -> StreamK Identity a
fromString = String -> StreamK Identity a
forall l. IsList l => [Item l] -> l
fromList

-------------------------------------------------------------------------------
-- Foldable
-------------------------------------------------------------------------------

-- | Lazy right associative fold.
{-# INLINE foldr #-}
foldr :: Monad m => (a -> b -> b) -> b -> StreamK m a -> m b
foldr :: (a -> b -> b) -> b -> StreamK m a -> m b
foldr a -> b -> b
step b
acc = (a -> m b -> m b) -> m b -> StreamK m a -> m b
forall a (m :: * -> *) b.
(a -> m b -> m b) -> m b -> StreamK m a -> m b
foldrM (\a
x m b
xs -> m b
xs m b -> (b -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
b -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> b
step a
x b
b)) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
acc)

-- The default Foldable instance has several issues:
-- 1) several definitions do not have INLINE on them, so we provide
--    re-implementations with INLINE pragmas.
-- 2) the definitions of sum/product/maximum/minimum are inefficient as they
--    use right folds, they cannot run in constant memory. We provide
--    implementations using strict left folds here.

instance (Foldable m, Monad m) => Foldable (StreamK m) where

    {-# INLINE foldMap #-}
    foldMap :: (a -> m) -> StreamK m a -> m
foldMap a -> m
f =
          m m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold
        (m m -> m) -> (StreamK m a -> m m) -> StreamK m a -> m
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m -> m) -> m -> StreamK m a -> m m
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> StreamK m a -> m b
Streamly.Internal.Data.Stream.StreamK.Type.foldr (m -> m -> m
forall a. Monoid a => a -> a -> a
mappend (m -> m -> m) -> (a -> m) -> a -> m -> m
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m
f) m
forall a. Monoid a => a
mempty

    {-# INLINE foldr #-}
    foldr :: (a -> b -> b) -> b -> StreamK m a -> b
foldr a -> b -> b
f b
z StreamK m a
t = Endo b -> b -> b
forall a. Endo a -> a -> a
appEndo ((a -> Endo b) -> StreamK m a -> Endo b
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((b -> b) -> Endo b
forall a. (a -> a) -> Endo a
Endo ((b -> b) -> Endo b) -> (a -> b -> b) -> a -> Endo b
forall b c a. Coercible b c => (b -> c) -> (a -> b) -> a -> c
#. a -> b -> b
f) StreamK m a
t) b
z

    {-# INLINE foldl' #-}
    foldl' :: (b -> a -> b) -> b -> StreamK m a -> b
foldl' b -> a -> b
f b
z0 StreamK m a
xs = (a -> (b -> b) -> b -> b) -> (b -> b) -> StreamK m a -> b -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Data.Foldable.foldr a -> (b -> b) -> b -> b
forall b. a -> (b -> b) -> b -> b
f' b -> b
forall a. a -> a
id StreamK m a
xs b
z0
        where f' :: a -> (b -> b) -> b -> b
f' a
x b -> b
k = (b -> b) -> b -> b
oneShot ((b -> b) -> b -> b) -> (b -> b) -> b -> b
forall a b. (a -> b) -> a -> b
$ \b
z -> b -> b
k (b -> b) -> b -> b
forall a b. (a -> b) -> a -> b
$! b -> a -> b
f b
z a
x

    {-# INLINE length #-}
    length :: StreamK m a -> Int
length = (Int -> a -> Int) -> Int -> StreamK m a -> Int
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' (\Int
n a
_ -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
0

    {-# INLINE elem #-}
    elem :: a -> StreamK m a -> Bool
elem = (a -> Bool) -> StreamK m a -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((a -> Bool) -> StreamK m a -> Bool)
-> (a -> a -> Bool) -> a -> StreamK m a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Bool
forall a. Eq a => a -> a -> Bool
(==)

    {-# INLINE maximum #-}
    maximum :: StreamK m a -> a
maximum =
          a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe (String -> a
forall a. String -> a
errorWithoutStackTrace String
"maximum: empty stream")
        (Maybe a -> a) -> (StreamK m a -> Maybe a) -> StreamK m a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe' a -> Maybe a
forall a. Maybe' a -> Maybe a
toMaybe
        (Maybe' a -> Maybe a)
-> (StreamK m a -> Maybe' a) -> StreamK m a -> Maybe a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe' a -> a -> Maybe' a) -> Maybe' a -> StreamK m a -> Maybe' a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' Maybe' a -> a -> Maybe' a
forall a. Ord a => Maybe' a -> a -> Maybe' a
getMax Maybe' a
forall a. Maybe' a
Nothing'

        where

        getMax :: Maybe' a -> a -> Maybe' a
getMax Maybe' a
Nothing' a
x = a -> Maybe' a
forall a. a -> Maybe' a
Just' a
x
        getMax (Just' a
mx) a
x = a -> Maybe' a
forall a. a -> Maybe' a
Just' (a -> Maybe' a) -> a -> Maybe' a
forall a b. (a -> b) -> a -> b
$! a -> a -> a
forall a. Ord a => a -> a -> a
max a
mx a
x

    {-# INLINE minimum #-}
    minimum :: StreamK m a -> a
minimum =
          a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe (String -> a
forall a. String -> a
errorWithoutStackTrace String
"minimum: empty stream")
        (Maybe a -> a) -> (StreamK m a -> Maybe a) -> StreamK m a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe' a -> Maybe a
forall a. Maybe' a -> Maybe a
toMaybe
        (Maybe' a -> Maybe a)
-> (StreamK m a -> Maybe' a) -> StreamK m a -> Maybe a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe' a -> a -> Maybe' a) -> Maybe' a -> StreamK m a -> Maybe' a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' Maybe' a -> a -> Maybe' a
forall a. Ord a => Maybe' a -> a -> Maybe' a
getMin Maybe' a
forall a. Maybe' a
Nothing'

        where

        getMin :: Maybe' a -> a -> Maybe' a
getMin Maybe' a
Nothing' a
x = a -> Maybe' a
forall a. a -> Maybe' a
Just' a
x
        getMin (Just' a
mn) a
x = a -> Maybe' a
forall a. a -> Maybe' a
Just' (a -> Maybe' a) -> a -> Maybe' a
forall a b. (a -> b) -> a -> b
$! a -> a -> a
forall a. Ord a => a -> a -> a
min a
mn a
x

    {-# INLINE sum #-}
    sum :: StreamK m a -> a
sum = (a -> a -> a) -> a -> StreamK m a -> a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' a -> a -> a
forall a. Num a => a -> a -> a
(+) a
0

    {-# INLINE product #-}
    product :: StreamK m a -> a
product = (a -> a -> a) -> a -> StreamK m a -> a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Data.Foldable.foldl' a -> a -> a
forall a. Num a => a -> a -> a
(*) a
1

-------------------------------------------------------------------------------
-- Traversable
-------------------------------------------------------------------------------

instance Traversable (StreamK Identity) where
    {-# INLINE traverse #-}
    traverse :: (a -> f b) -> StreamK Identity a -> f (StreamK Identity b)
traverse a -> f b
f StreamK Identity a
xs =
        Identity (f (StreamK Identity b)) -> f (StreamK Identity b)
forall a. Identity a -> a
runIdentity
            (Identity (f (StreamK Identity b)) -> f (StreamK Identity b))
-> Identity (f (StreamK Identity b)) -> f (StreamK Identity b)
forall a b. (a -> b) -> a -> b
$ (a -> f (StreamK Identity b) -> f (StreamK Identity b))
-> f (StreamK Identity b)
-> StreamK Identity a
-> Identity (f (StreamK Identity b))
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> StreamK m a -> m b
Streamly.Internal.Data.Stream.StreamK.Type.foldr
                a -> f (StreamK Identity b) -> f (StreamK Identity b)
forall (m :: * -> *). a -> f (StreamK m b) -> f (StreamK m b)
consA (StreamK Identity b -> f (StreamK Identity b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure StreamK Identity b
forall a. Monoid a => a
mempty) StreamK Identity a
xs

        where

        consA :: a -> f (StreamK m b) -> f (StreamK m b)
consA a
x f (StreamK m b)
ys = (b -> StreamK m b -> StreamK m b)
-> f b -> f (StreamK m b) -> f (StreamK m b)
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 b -> StreamK m b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons (a -> f b
f a
x) f (StreamK m b)
ys

-------------------------------------------------------------------------------
-- Nesting
-------------------------------------------------------------------------------

-- | Detach a stream from an SVar
{-# INLINE unShare #-}
unShare :: StreamK m a -> StreamK m a
unShare :: StreamK m a -> StreamK m a
unShare StreamK m a
x = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
x

-- XXX the function stream and value stream can run in parallel
{-# INLINE crossApplyWith #-}
crossApplyWith ::
       (StreamK m b -> StreamK m b -> StreamK m b)
    -> StreamK m (a -> b)
    -> StreamK m a
    -> StreamK m b
crossApplyWith :: (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m (a -> b) -> StreamK m a -> StreamK m b
crossApplyWith StreamK m b -> StreamK m b -> StreamK m b
par StreamK m (a -> b)
fstream StreamK m a
stream = StreamK m (a -> b) -> StreamK m b
go1 StreamK m (a -> b)
fstream

    where

    go1 :: StreamK m (a -> b) -> StreamK m b
go1 StreamK m (a -> b)
m =
        (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
                single :: (a -> b) -> m r
single a -> b
f   = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare ((a -> b) -> StreamK m a -> StreamK m b
forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b
go2 a -> b
f StreamK m a
stream)
                yieldk :: (a -> b) -> StreamK m (a -> b) -> m r
yieldk a -> b
f StreamK m (a -> b)
r = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare ((a -> b) -> StreamK m a -> StreamK m b
forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b
go2 a -> b
f StreamK m a
stream) StreamK m b -> StreamK m b -> StreamK m b
`par` StreamK m (a -> b) -> StreamK m b
go1 StreamK m (a -> b)
r
            in State StreamK m (a -> b)
-> ((a -> b) -> StreamK m (a -> b) -> m r)
-> ((a -> b) -> m r)
-> m r
-> StreamK m (a -> b)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m (a -> b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) (a -> b) -> StreamK m (a -> b) -> m r
yieldk (a -> b) -> m r
single m r
stp StreamK m (a -> b)
m

    go2 :: (t -> a) -> StreamK m t -> StreamK m a
go2 t -> a
f StreamK m t
m =
        (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
            let single :: t -> m r
single t
a   = a -> m r
sng (t -> a
f t
a)
                yieldk :: t -> StreamK m t -> m r
yieldk t
a StreamK m t
r = a -> StreamK m a -> m r
yld (t -> a
f t
a) ((t -> a) -> StreamK m t -> StreamK m a
go2 t -> a
f StreamK m t
r)
            in State StreamK m t
-> (t -> StreamK m t -> m r)
-> (t -> m r)
-> m r
-> StreamK m t
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m t
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) t -> StreamK m t -> m r
yieldk t -> m r
single m r
stp StreamK m t
m

-- | Apply a stream of functions to a stream of values and flatten the results.
--
-- Note that the second stream is evaluated multiple times.
--
-- Definition:
--
-- >>> crossApply = StreamK.crossApplyWith StreamK.append
-- >>> crossApply = Stream.crossWith id
--
{-# INLINE crossApply #-}
crossApply ::
       StreamK m (a -> b)
    -> StreamK m a
    -> StreamK m b
crossApply :: StreamK m (a -> b) -> StreamK m a -> StreamK m b
crossApply StreamK m (a -> b)
fstream StreamK m a
stream = StreamK m (a -> b) -> StreamK m b
forall a. StreamK m (a -> a) -> StreamK m a
go1 StreamK m (a -> b)
fstream

    where

    go1 :: StreamK m (a -> a) -> StreamK m a
go1 StreamK m (a -> a)
m =
        (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: StreamK m a -> m r
foldShared = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
                single :: (a -> a) -> m r
single a -> a
f   = StreamK m a -> m r
foldShared (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ (a -> a) -> StreamK m a -> StreamK m a
forall a b (m :: * -> *). (a -> b) -> StreamK m a -> StreamK m b
go3 a -> a
f StreamK m a
stream
                yieldk :: (a -> a) -> StreamK m (a -> a) -> m r
yieldk a -> a
f StreamK m (a -> a)
r = StreamK m a -> m r
foldShared (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ (a -> a) -> StreamK m (a -> a) -> StreamK m a -> StreamK m a
go2 a -> a
f StreamK m (a -> a)
r StreamK m a
stream
            in State StreamK m (a -> a)
-> ((a -> a) -> StreamK m (a -> a) -> m r)
-> ((a -> a) -> m r)
-> m r
-> StreamK m (a -> a)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m (a -> a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) (a -> a) -> StreamK m (a -> a) -> m r
yieldk (a -> a) -> m r
single m r
stp StreamK m (a -> a)
m

    go2 :: (a -> a) -> StreamK m (a -> a) -> StreamK m a -> StreamK m a
go2 a -> a
f StreamK m (a -> a)
r1 StreamK m a
m =
        (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: StreamK m a -> m r
foldShared = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
                stop :: m r
stop = StreamK m a -> m r
foldShared (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m (a -> a) -> StreamK m a
go1 StreamK m (a -> a)
r1
                single :: a -> m r
single a
a   = a -> StreamK m a -> m r
yld (a -> a
f a
a) (StreamK m (a -> a) -> StreamK m a
go1 StreamK m (a -> a)
r1)
                yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld (a -> a
f a
a) ((a -> a) -> StreamK m (a -> a) -> StreamK m a -> StreamK m a
go2 a -> a
f StreamK m (a -> a)
r1 StreamK m a
r)
            in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m

    go3 :: (t -> a) -> StreamK m t -> StreamK m a
go3 t -> a
f StreamK m t
m =
        (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
            let single :: t -> m r
single t
a   = a -> m r
sng (t -> a
f t
a)
                yieldk :: t -> StreamK m t -> m r
yieldk t
a StreamK m t
r = a -> StreamK m a -> m r
yld (t -> a
f t
a) ((t -> a) -> StreamK m t -> StreamK m a
go3 t -> a
f StreamK m t
r)
            in State StreamK m t
-> (t -> StreamK m t -> m r)
-> (t -> m r)
-> m r
-> StreamK m t
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m t
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) t -> StreamK m t -> m r
yieldk t -> m r
single m r
stp StreamK m t
m

{-# INLINE crossApplySnd #-}
crossApplySnd ::
       StreamK m a
    -> StreamK m b
    -> StreamK m b
crossApplySnd :: StreamK m a -> StreamK m b -> StreamK m b
crossApplySnd StreamK m a
fstream StreamK m b
stream = StreamK m a -> StreamK m b
forall p. StreamK m p -> StreamK m b
go1 StreamK m a
fstream

    where

    go1 :: StreamK m p -> StreamK m b
go1 StreamK m p
m =
        (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
                single :: p -> m r
single p
_   = StreamK m b -> m r
foldShared StreamK m b
stream
                yieldk :: p -> StreamK m p -> m r
yieldk p
_ StreamK m p
r = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m p -> StreamK m b -> StreamK m b
go2 StreamK m p
r StreamK m b
stream
            in State StreamK m p
-> (p -> StreamK m p -> m r)
-> (p -> m r)
-> m r
-> StreamK m p
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) p -> StreamK m p -> m r
forall p. p -> StreamK m p -> m r
yieldk p -> m r
forall p. p -> m r
single m r
stp StreamK m p
m

    go2 :: StreamK m p -> StreamK m b -> StreamK m b
go2 StreamK m p
r1 StreamK m b
m =
        (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
                stop :: m r
stop = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m p -> StreamK m b
go1 StreamK m p
r1
                single :: b -> m r
single b
a   = b -> StreamK m b -> m r
yld b
a (StreamK m p -> StreamK m b
go1 StreamK m p
r1)
                yieldk :: b -> StreamK m b -> m r
yieldk b
a StreamK m b
r = b -> StreamK m b -> m r
yld b
a (StreamK m p -> StreamK m b -> StreamK m b
go2 StreamK m p
r1 StreamK m b
r)
            in State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m b
st b -> StreamK m b -> m r
yieldk b -> m r
single m r
stop StreamK m b
m

{-# INLINE crossApplyFst #-}
crossApplyFst ::
       StreamK m a
    -> StreamK m b
    -> StreamK m a
crossApplyFst :: StreamK m a -> StreamK m b -> StreamK m a
crossApplyFst StreamK m a
fstream StreamK m b
stream = StreamK m a -> StreamK m a
forall a. StreamK m a -> StreamK m a
go1 StreamK m a
fstream

    where

    go1 :: StreamK m a -> StreamK m a
go1 StreamK m a
m =
        (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: StreamK m a -> m r
foldShared = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
                single :: a -> m r
single a
f   = StreamK m a -> m r
foldShared (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m b -> StreamK m a
forall a (m :: * -> *) p. a -> StreamK m p -> StreamK m a
go3 a
f StreamK m b
stream
                yieldk :: a -> StreamK m a -> m r
yieldk a
f StreamK m a
r = StreamK m a -> m r
foldShared (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> StreamK m b -> StreamK m a
go2 a
f StreamK m a
r StreamK m b
stream
            in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m

    go2 :: a -> StreamK m a -> StreamK m b -> StreamK m a
go2 a
f StreamK m a
r1 StreamK m b
m =
        (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: StreamK m a -> m r
foldShared = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
                stop :: m r
stop = StreamK m a -> m r
foldShared (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a
go1 StreamK m a
r1
                single :: p -> m r
single p
_   = a -> StreamK m a -> m r
yld a
f (StreamK m a -> StreamK m a
go1 StreamK m a
r1)
                yieldk :: p -> StreamK m b -> m r
yieldk p
_ StreamK m b
r = a -> StreamK m a -> m r
yld a
f (a -> StreamK m a -> StreamK m b -> StreamK m a
go2 a
f StreamK m a
r1 StreamK m b
r)
            in State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) b -> StreamK m b -> m r
forall p. p -> StreamK m b -> m r
yieldk b -> m r
forall p. p -> m r
single m r
stop StreamK m b
m

    go3 :: a -> StreamK m p -> StreamK m a
go3 a
f StreamK m p
m =
        (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
            let single :: p -> m r
single p
_   = a -> m r
sng a
f
                yieldk :: p -> StreamK m p -> m r
yieldk p
_ StreamK m p
r = a -> StreamK m a -> m r
yld a
f (a -> StreamK m p -> StreamK m a
go3 a
f StreamK m p
r)
            in State StreamK m p
-> (p -> StreamK m p -> m r)
-> (p -> m r)
-> m r
-> StreamK m p
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) p -> StreamK m p -> m r
forall p. p -> StreamK m p -> m r
yieldk p -> m r
forall p. p -> m r
single m r
stp StreamK m p
m

-- |
-- Definition:
--
-- >>> crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2
--
-- Note that the second stream is evaluated multiple times.
--
{-# INLINE crossWith #-}
crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
crossWith :: (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
crossWith a -> b -> c
f StreamK m a
m1 StreamK m b
m2 = (a -> b -> c) -> StreamK m a -> StreamK m (b -> c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b -> c
f StreamK m a
m1 StreamK m (b -> c) -> StreamK m b -> StreamK m c
forall (m :: * -> *) a b.
StreamK m (a -> b) -> StreamK m a -> StreamK m b
`crossApply` StreamK m b
m2

-- | Given a @StreamK m a@ and @StreamK m b@ generate a stream with all possible
-- combinations of the tuple @(a, b)@.
--
-- Definition:
--
-- >>> cross = StreamK.crossWith (,)
--
-- The second stream is evaluated multiple times. If that is not desired it can
-- be cached in an 'Data.Array.Array' and then generated from the array before
-- calling this function. Caching may also improve performance if the stream is
-- expensive to evaluate.
--
-- See 'Streamly.Internal.Data.Unfold.cross' for a much faster fused
-- alternative.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE cross #-}
cross :: Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b)
cross :: StreamK m a -> StreamK m b -> StreamK m (a, b)
cross = (a -> b -> (a, b))
-> StreamK m a -> StreamK m b -> StreamK m (a, b)
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
crossWith (,)

-- XXX This is just concatMapWith with arguments flipped. We need to keep this
-- instead of using a concatMap style definition because the bind
-- implementation in Async and WAsync streams show significant perf degradation
-- if the argument order is changed.
{-# INLINE bindWith #-}
bindWith ::
       (StreamK m b -> StreamK m b -> StreamK m b)
    -> StreamK m a
    -> (a -> StreamK m b)
    -> StreamK m b
bindWith :: (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
bindWith StreamK m b -> StreamK m b -> StreamK m b
par StreamK m a
m1 a -> StreamK m b
f = StreamK m a -> StreamK m b
go StreamK m a
m1
    where
        go :: StreamK m a -> StreamK m b
go StreamK m a
m =
            (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
                let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
                    single :: a -> m r
single a
a   = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare (a -> StreamK m b
f a
a)
                    yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare (a -> StreamK m b
f a
a) StreamK m b -> StreamK m b -> StreamK m b
`par` StreamK m a -> StreamK m b
go StreamK m a
r
                in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m

-- XXX express in terms of foldrS?
-- XXX can we use a different stream type for the generated stream being
-- falttened so that we can combine them differently and keep the resulting
-- stream different?
-- XXX do we need specialize to IO?
-- XXX can we optimize when c and a are same, by removing the forall using
-- rewrite rules with type applications?

-- | Perform a 'concatMap' using a specified concat strategy. The first
-- argument specifies a merge or concat function that is used to merge the
-- streams generated by the map function.
--
{-# INLINE concatMapWith #-}
concatMapWith
    ::
       (StreamK m b -> StreamK m b -> StreamK m b)
    -> (a -> StreamK m b)
    -> StreamK m a
    -> StreamK m b
concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapWith StreamK m b -> StreamK m b -> StreamK m b
par a -> StreamK m b
f StreamK m a
xs = (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
bindWith StreamK m b -> StreamK m b -> StreamK m b
par StreamK m a
xs a -> StreamK m b
f

{-# INLINE concatMap #-}
concatMap :: (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMap :: (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMap = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append

{-
-- Fused version.
-- XXX This fuses but when the stream is nil this performs poorly.
-- The filterAllOut benchmark degrades. Need to investigate and fix that.
{-# INLINE concatMap #-}
concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
concatMap f xs = buildS
    (\c n -> foldrS (\x b -> foldrS c b (f x)) n xs)

-- Stream polymorphic concatMap implementation
-- XXX need to use buildSM/foldrSMShared for parallel behavior
-- XXX unShare seems to degrade the fused performance
{-# INLINE_EARLY concatMap_ #-}
concatMap_ :: IsStream t => (a -> t m b) -> t m a -> t m b
concatMap_ f xs = buildS
     (\c n -> foldrSShared (\x b -> foldrSShared c b (unShare $ f x)) n xs)
-}

-- | Combine streams in pairs using a binary combinator, the resulting streams
-- are then combined again in pairs recursively until we get to a single
-- combined stream. The composition would thus form a binary tree.
--
-- For example, you can sort a stream using merge sort like this:
--
-- >>> s = StreamK.fromStream $ Stream.fromList [5,1,7,9,2]
-- >>> generate = StreamK.fromPure
-- >>> combine = StreamK.mergeBy compare
-- >>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.mergeMapWith combine generate s
-- [1,2,5,7,9]
--
-- Note that if the stream length is not a power of 2, the binary tree composed
-- by mergeMapWith would not be balanced, which may or may not be important
-- depending on what you are trying to achieve.
--
-- /Caution: the stream of streams must be finite/
--
-- /Pre-release/
--
{-# INLINE mergeMapWith #-}
mergeMapWith
    ::
       (StreamK m b -> StreamK m b -> StreamK m b)
    -> (a -> StreamK m b)
    -> StreamK m a
    -> StreamK m b
mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
mergeMapWith StreamK m b -> StreamK m b -> StreamK m b
combine a -> StreamK m b
f StreamK m a
str = StreamK m (StreamK m b) -> StreamK m b
go (StreamK m a -> StreamK m (StreamK m b)
forall (m :: * -> *). StreamK m a -> StreamK m (StreamK m b)
leafPairs StreamK m a
str)

    where

    go :: StreamK m (StreamK m b) -> StreamK m b
go StreamK m (StreamK m b)
stream =
        (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
                single :: StreamK m b -> m r
single StreamK m b
a   = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare StreamK m b
a
                yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b
a StreamK m (StreamK m b)
r = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m (StreamK m b) -> StreamK m b
go1 StreamK m b
a StreamK m (StreamK m b)
r
            in State StreamK m (StreamK m b)
-> (StreamK m b -> StreamK m (StreamK m b) -> m r)
-> (StreamK m b -> m r)
-> m r
-> StreamK m (StreamK m b)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m (StreamK m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b -> m r
single m r
stp StreamK m (StreamK m b)
stream

    go1 :: StreamK m b -> StreamK m (StreamK m b) -> StreamK m b
go1 StreamK m b
a1 StreamK m (StreamK m b)
stream =
        (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: StreamK m b -> m r
foldShared = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
                stop :: m r
stop = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare StreamK m b
a1
                single :: StreamK m b -> m r
single StreamK m b
a = StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare StreamK m b
a1 StreamK m b -> StreamK m b -> StreamK m b
`combine` StreamK m b
a
                yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b
a StreamK m (StreamK m b)
r =
                    StreamK m b -> m r
foldShared (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m (StreamK m b) -> StreamK m b
go (StreamK m (StreamK m b) -> StreamK m b)
-> StreamK m (StreamK m b) -> StreamK m b
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m b -> StreamK m b
combine StreamK m b
a1 StreamK m b
a StreamK m b -> StreamK m (StreamK m b) -> StreamK m (StreamK m b)
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m (StreamK m b) -> StreamK m (StreamK m b)
forall (m :: * -> *).
StreamK m (StreamK m b) -> StreamK m (StreamK m b)
nonLeafPairs StreamK m (StreamK m b)
r
            in State StreamK m (StreamK m b)
-> (StreamK m b -> StreamK m (StreamK m b) -> m r)
-> (StreamK m b -> m r)
-> m r
-> StreamK m (StreamK m b)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m (StreamK m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b -> m r
single m r
stop StreamK m (StreamK m b)
stream

    -- Exactly the same as "go" except that stop continuation extracts the
    -- stream.
    leafPairs :: StreamK m a -> StreamK m (StreamK m b)
leafPairs StreamK m a
stream =
        (forall r.
 State StreamK m (StreamK m b)
 -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
 -> (StreamK m b -> m r)
 -> m r
 -> m r)
-> StreamK m (StreamK m b)
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m (StreamK m b)
  -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
  -> (StreamK m b -> m r)
  -> m r
  -> m r)
 -> StreamK m (StreamK m b))
-> (forall r.
    State StreamK m (StreamK m b)
    -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
    -> (StreamK m b -> m r)
    -> m r
    -> m r)
-> StreamK m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ \State StreamK m (StreamK m b)
st StreamK m b -> StreamK m (StreamK m b) -> m r
yld StreamK m b -> m r
sng m r
stp ->
            let foldShared :: StreamK m (StreamK m b) -> m r
foldShared = State StreamK m (StreamK m b)
-> (StreamK m b -> StreamK m (StreamK m b) -> m r)
-> (StreamK m b -> m r)
-> m r
-> StreamK m (StreamK m b)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m (StreamK m b)
st StreamK m b -> StreamK m (StreamK m b) -> m r
yld StreamK m b -> m r
sng m r
stp
                single :: a -> m r
single a
a   = StreamK m b -> m r
sng (a -> StreamK m b
f a
a)
                yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m (StreamK m b) -> m r
foldShared (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> StreamK m (StreamK m b)
leafPairs1 a
a StreamK m a
r
            in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m (StreamK m b) -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m (StreamK m b)
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
stream

    leafPairs1 :: a -> StreamK m a -> StreamK m (StreamK m b)
leafPairs1 a
a1 StreamK m a
stream =
        (forall r.
 State StreamK m (StreamK m b)
 -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
 -> (StreamK m b -> m r)
 -> m r
 -> m r)
-> StreamK m (StreamK m b)
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m (StreamK m b)
  -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
  -> (StreamK m b -> m r)
  -> m r
  -> m r)
 -> StreamK m (StreamK m b))
-> (forall r.
    State StreamK m (StreamK m b)
    -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
    -> (StreamK m b -> m r)
    -> m r
    -> m r)
-> StreamK m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ \State StreamK m (StreamK m b)
st StreamK m b -> StreamK m (StreamK m b) -> m r
yld StreamK m b -> m r
sng m r
_ ->
            let stop :: m r
stop = StreamK m b -> m r
sng (a -> StreamK m b
f a
a1)
                single :: a -> m r
single a
a = StreamK m b -> m r
sng (a -> StreamK m b
f a
a1 StreamK m b -> StreamK m b -> StreamK m b
`combine` a -> StreamK m b
f a
a)
                yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m b -> StreamK m (StreamK m b) -> m r
yld (a -> StreamK m b
f a
a1 StreamK m b -> StreamK m b -> StreamK m b
`combine` a -> StreamK m b
f a
a) (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m (StreamK m b)
leafPairs StreamK m a
r
            in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m (StreamK m b) -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m (StreamK m b)
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
stream

    -- Exactly the same as "leafPairs" except that it does not map "f"
    nonLeafPairs :: StreamK m (StreamK m b) -> StreamK m (StreamK m b)
nonLeafPairs StreamK m (StreamK m b)
stream =
        (forall r.
 State StreamK m (StreamK m b)
 -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
 -> (StreamK m b -> m r)
 -> m r
 -> m r)
-> StreamK m (StreamK m b)
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m (StreamK m b)
  -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
  -> (StreamK m b -> m r)
  -> m r
  -> m r)
 -> StreamK m (StreamK m b))
-> (forall r.
    State StreamK m (StreamK m b)
    -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
    -> (StreamK m b -> m r)
    -> m r
    -> m r)
-> StreamK m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ \State StreamK m (StreamK m b)
st StreamK m b -> StreamK m (StreamK m b) -> m r
yld StreamK m b -> m r
sng m r
stp ->
            let foldShared :: StreamK m (StreamK m b) -> m r
foldShared = State StreamK m (StreamK m b)
-> (StreamK m b -> StreamK m (StreamK m b) -> m r)
-> (StreamK m b -> m r)
-> m r
-> StreamK m (StreamK m b)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m (StreamK m b)
st StreamK m b -> StreamK m (StreamK m b) -> m r
yld StreamK m b -> m r
sng m r
stp
                single :: StreamK m b -> m r
single StreamK m b
a   = StreamK m b -> m r
sng StreamK m b
a
                yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b
a StreamK m (StreamK m b)
r = StreamK m (StreamK m b) -> m r
foldShared (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m (StreamK m b) -> StreamK m (StreamK m b)
nonLeafPairs1 StreamK m b
a StreamK m (StreamK m b)
r
            in State StreamK m (StreamK m b)
-> (StreamK m b -> StreamK m (StreamK m b) -> m r)
-> (StreamK m b -> m r)
-> m r
-> StreamK m (StreamK m b)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m (StreamK m b) -> State StreamK m (StreamK m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m (StreamK m b)
st) StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b -> m r
single m r
stp StreamK m (StreamK m b)
stream

    nonLeafPairs1 :: StreamK m b -> StreamK m (StreamK m b) -> StreamK m (StreamK m b)
nonLeafPairs1 StreamK m b
a1 StreamK m (StreamK m b)
stream =
        (forall r.
 State StreamK m (StreamK m b)
 -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
 -> (StreamK m b -> m r)
 -> m r
 -> m r)
-> StreamK m (StreamK m b)
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m (StreamK m b)
  -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
  -> (StreamK m b -> m r)
  -> m r
  -> m r)
 -> StreamK m (StreamK m b))
-> (forall r.
    State StreamK m (StreamK m b)
    -> (StreamK m b -> StreamK m (StreamK m b) -> m r)
    -> (StreamK m b -> m r)
    -> m r
    -> m r)
-> StreamK m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ \State StreamK m (StreamK m b)
st StreamK m b -> StreamK m (StreamK m b) -> m r
yld StreamK m b -> m r
sng m r
_ ->
            let stop :: m r
stop = StreamK m b -> m r
sng StreamK m b
a1
                single :: StreamK m b -> m r
single StreamK m b
a = StreamK m b -> m r
sng (StreamK m b
a1 StreamK m b -> StreamK m b -> StreamK m b
`combine` StreamK m b
a)
                yieldk :: StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b
a StreamK m (StreamK m b)
r = StreamK m b -> StreamK m (StreamK m b) -> m r
yld (StreamK m b
a1 StreamK m b -> StreamK m b -> StreamK m b
`combine` StreamK m b
a) (StreamK m (StreamK m b) -> m r) -> StreamK m (StreamK m b) -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m (StreamK m b) -> StreamK m (StreamK m b)
nonLeafPairs StreamK m (StreamK m b)
r
            in State StreamK m (StreamK m b)
-> (StreamK m b -> StreamK m (StreamK m b) -> m r)
-> (StreamK m b -> m r)
-> m r
-> StreamK m (StreamK m b)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m (StreamK m b) -> State StreamK m (StreamK m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m (StreamK m b)
st) StreamK m b -> StreamK m (StreamK m b) -> m r
yieldk StreamK m b -> m r
single m r
stop StreamK m (StreamK m b)
stream

{-
instance Monad m => Applicative (StreamK m) where
    {-# INLINE pure #-}
    pure = fromPure

    {-# INLINE (<*>) #-}
    (<*>) = crossApply

    {-# INLINE liftA2 #-}
    liftA2 f x = (<*>) (fmap f x)

    {-# INLINE (*>) #-}
    (*>) = crossApplySnd

    {-# INLINE (<*) #-}
    (<*) = crossApplyFst

-- NOTE: even though concatMap for StreamD is 3x faster compared to StreamK,
-- the monad instance of StreamD is slower than StreamK after foldr/build
-- fusion.
instance Monad m => Monad (StreamK m) where
    {-# INLINE return #-}
    return = pure

    {-# INLINE (>>=) #-}
    (>>=) = flip concatMap
-}

{-
-- Like concatMap but generates stream using an unfold function. Similar to
-- unfoldMany but for StreamK.
concatUnfoldr :: IsStream t
    => (b -> t m (Maybe (a, b))) -> t m b -> t m a
concatUnfoldr = undefined
-}

------------------------------------------------------------------------------
-- concatIterate - Map and flatten Trees of Streams
------------------------------------------------------------------------------

-- | Yield an input element in the output stream, map a stream generator on it
-- and repeat the process on the resulting stream. Resulting streams are
-- flattened using the 'concatMapWith' combinator. This can be used for a depth
-- first style (DFS) traversal of a tree like structure.
--
-- Example, list a directory tree using DFS:
--
-- >>> f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil)
-- >>> input = StreamK.fromPure (Left ".")
-- >>> ls = StreamK.concatIterateWith StreamK.append f input
--
-- Note that 'iterateM' is a special case of 'concatIterateWith':
--
-- >>> iterateM f = StreamK.concatIterateWith StreamK.append (StreamK.fromEffect . f) . StreamK.fromEffect
--
-- /Pre-release/
--
{-# INLINE concatIterateWith #-}
concatIterateWith ::
       (StreamK m a -> StreamK m a -> StreamK m a)
    -> (a -> StreamK m a)
    -> StreamK m a
    -> StreamK m a
concatIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a)
-> (a -> StreamK m a) -> StreamK m a -> StreamK m a
concatIterateWith StreamK m a -> StreamK m a -> StreamK m a
combine a -> StreamK m a
f = StreamK m a -> StreamK m a
iterateStream

    where

    iterateStream :: StreamK m a -> StreamK m a
iterateStream = (StreamK m a -> StreamK m a -> StreamK m a)
-> (a -> StreamK m a) -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapWith StreamK m a -> StreamK m a -> StreamK m a
combine a -> StreamK m a
generate

    generate :: a -> StreamK m a
generate a
x = a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a -> StreamK m a
iterateStream (a -> StreamK m a
f a
x)

-- | Like 'concatIterateWith' but uses the pairwise flattening combinator
-- 'mergeMapWith' for flattening the resulting streams. This can be used for a
-- balanced traversal of a tree like structure.
--
-- Example, list a directory tree using balanced traversal:
--
-- >>> f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil)
-- >>> input = StreamK.fromPure (Left ".")
-- >>> ls = StreamK.mergeIterateWith StreamK.interleave f input
--
-- /Pre-release/
--
{-# INLINE mergeIterateWith #-}
mergeIterateWith ::
       (StreamK m a -> StreamK m a -> StreamK m a)
    -> (a -> StreamK m a)
    -> StreamK m a
    -> StreamK m a
mergeIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a)
-> (a -> StreamK m a) -> StreamK m a -> StreamK m a
mergeIterateWith StreamK m a -> StreamK m a -> StreamK m a
combine a -> StreamK m a
f = StreamK m a -> StreamK m a
iterateStream

    where

    iterateStream :: StreamK m a -> StreamK m a
iterateStream = (StreamK m a -> StreamK m a -> StreamK m a)
-> (a -> StreamK m a) -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
mergeMapWith StreamK m a -> StreamK m a -> StreamK m a
combine a -> StreamK m a
generate

    generate :: a -> StreamK m a
generate a
x = a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a -> StreamK m a
iterateStream (a -> StreamK m a
f a
x)

------------------------------------------------------------------------------
-- Flattening Graphs
------------------------------------------------------------------------------

-- To traverse graphs we need a state to be carried around in the traversal.
-- For example, we can use a hashmap to store the visited status of nodes.

-- | Like 'iterateMap' but carries a state in the stream generation function.
-- This can be used to traverse graph like structures, we can remember the
-- visited nodes in the state to avoid cycles.
--
-- Note that a combination of 'iterateMap' and 'usingState' can also be used to
-- traverse graphs. However, this function provides a more localized state
-- instead of using a global state.
--
-- See also: 'mfix'
--
-- /Pre-release/
--
{-# INLINE concatIterateScanWith #-}
concatIterateScanWith
    :: Monad m
    => (StreamK m a -> StreamK m a -> StreamK m a)
    -> (b -> a -> m (b, StreamK m a))
    -> m b
    -> StreamK m a
    -> StreamK m a
concatIterateScanWith :: (StreamK m a -> StreamK m a -> StreamK m a)
-> (b -> a -> m (b, StreamK m a))
-> m b
-> StreamK m a
-> StreamK m a
concatIterateScanWith StreamK m a -> StreamK m a -> StreamK m a
combine b -> a -> m (b, StreamK m a)
f m b
initial StreamK m a
stream =
    m (StreamK m a) -> StreamK m a
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
concatEffect (m (StreamK m a) -> StreamK m a) -> m (StreamK m a) -> StreamK m a
forall a b. (a -> b) -> a -> b
$ do
        b
b <- m b
initial
        (b, StreamK m a) -> m (StreamK m a)
iterateStream (b
b, StreamK m a
stream)

    where

    iterateStream :: (b, StreamK m a) -> m (StreamK m a)
iterateStream (b
b, StreamK m a
s) = StreamK m a -> m (StreamK m a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamK m a -> m (StreamK m a)) -> StreamK m a -> m (StreamK m a)
forall a b. (a -> b) -> a -> b
$ (StreamK m a -> StreamK m a -> StreamK m a)
-> (a -> StreamK m a) -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapWith StreamK m a -> StreamK m a -> StreamK m a
combine (b -> a -> StreamK m a
generate b
b) StreamK m a
s

    generate :: b -> a -> StreamK m a
generate b
b a
a = a
a a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` b -> a -> StreamK m a
feedback b
b a
a

    feedback :: b -> a -> StreamK m a
feedback b
b a
a = m (StreamK m a) -> StreamK m a
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
concatEffect (m (StreamK m a) -> StreamK m a) -> m (StreamK m a) -> StreamK m a
forall a b. (a -> b) -> a -> b
$ b -> a -> m (b, StreamK m a)
f b
b a
a m (b, StreamK m a)
-> ((b, StreamK m a) -> m (StreamK m a)) -> m (StreamK m a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (b, StreamK m a) -> m (StreamK m a)
iterateStream

------------------------------------------------------------------------------
-- Either streams
------------------------------------------------------------------------------

-- Keep concating either streams as long as rights are generated, stop as soon
-- as a left is generated and concat the left stream.
--
-- See also: 'handle'
--
-- /Unimplemented/
--
{-
concatMapEitherWith
    :: (forall x. t m x -> t m x -> t m x)
    -> (a -> t m (Either (StreamK m b) b))
    -> StreamK m a
    -> StreamK m b
concatMapEitherWith = undefined
-}

-- XXX We should prefer using the Maybe stream returning signatures over this.
-- This API should perhaps be removed in favor of those.

-- | In an 'Either' stream iterate on 'Left's.  This is a special case of
-- 'concatIterateWith':
--
-- >>> concatIterateLeftsWith combine f = StreamK.concatIterateWith combine (either f (const StreamK.nil))
--
-- To traverse a directory tree:
--
-- >>> input = StreamK.fromPure (Left ".")
-- >>> ls = StreamK.concatIterateLeftsWith StreamK.append (StreamK.fromStream . Dir.readEither) input
--
-- /Pre-release/
--
{-# INLINE concatIterateLeftsWith #-}
concatIterateLeftsWith
    :: (b ~ Either a c)
    => (StreamK m b -> StreamK m b -> StreamK m b)
    -> (a -> StreamK m b)
    -> StreamK m b
    -> StreamK m b
concatIterateLeftsWith :: (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m b -> StreamK m b
concatIterateLeftsWith StreamK m b -> StreamK m b -> StreamK m b
combine a -> StreamK m b
f =
    (StreamK m b -> StreamK m b -> StreamK m b)
-> (b -> StreamK m b) -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
(StreamK m a -> StreamK m a -> StreamK m a)
-> (a -> StreamK m a) -> StreamK m a -> StreamK m a
concatIterateWith StreamK m b -> StreamK m b -> StreamK m b
combine ((a -> StreamK m b)
-> (c -> StreamK m b) -> Either a c -> StreamK m b
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> StreamK m b
f (StreamK m b -> c -> StreamK m b
forall a b. a -> b -> a
const StreamK m b
forall (m :: * -> *) a. StreamK m a
nil))

------------------------------------------------------------------------------
-- Interleaving
------------------------------------------------------------------------------

infixr 6 `interleave`

-- Additionally we can have m elements yield from the first stream and n
-- elements yielding from the second stream. We can also have time slicing
-- variants of positional interleaving, e.g. run first stream for m seconds and
-- run the second stream for n seconds.

-- | Interleaves two streams, yielding one element from each stream
-- alternately.  When one stream stops the rest of the other stream is used in
-- the output stream.
--
-- When joining many streams in a left associative manner earlier streams will
-- get exponential priority than the ones joining later. Because of exponential
-- weighting it can be used with 'concatMapWith' even on a large number of
-- streams.
--
{-# INLINE interleave #-}
interleave :: StreamK m a -> StreamK m a -> StreamK m a
interleave :: StreamK m a -> StreamK m a -> StreamK m a
interleave StreamK m a
m1 StreamK m a
m2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    let stop :: m r
stop       = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
        single :: a -> m r
single a
a   = a -> StreamK m a -> m r
yld a
a StreamK m a
m2
        yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
interleave StreamK m a
m2 StreamK m a
r)
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1

infixr 6 `interleaveFst`

-- | Like `interleave` but stops interleaving as soon as the first stream stops.
--
{-# INLINE interleaveFst #-}
interleaveFst :: StreamK m a -> StreamK m a -> StreamK m a
interleaveFst :: StreamK m a -> StreamK m a -> StreamK m a
interleaveFst StreamK m a
m1 StreamK m a
m2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    let yieldFirst :: a -> StreamK m a -> m r
yieldFirst a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
yieldSecond StreamK m a
r StreamK m a
m2)
     in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldFirst a -> m r
sng m r
stp StreamK m a
m1

    where

    yieldSecond :: StreamK m a -> StreamK m a -> StreamK m a
yieldSecond StreamK m a
s1 StreamK m a
s2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
            let stop :: m r
stop       = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
s1
                single :: a -> m r
single a
a   = a -> StreamK m a -> m r
yld a
a StreamK m a
s1
                yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
interleave StreamK m a
s1 StreamK m a
r)
             in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
s2

infixr 6 `interleaveMin`

-- | Like `interleave` but stops interleaving as soon as any of the two streams
-- stops.
--
{-# INLINE interleaveMin #-}
interleaveMin :: StreamK m a -> StreamK m a -> StreamK m a
interleaveMin :: StreamK m a -> StreamK m a -> StreamK m a
interleaveMin StreamK m a
m1 StreamK m a
m2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
stp -> do
    let stop :: m r
stop       = m r
stp
        -- "single a" is defined as "yld a (interleaveMin m2 nil)" instead of
        -- "sng a" to keep the behaviour consistent with the yield
        -- continuation.
        single :: a -> m r
single a
a   = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
interleaveMin StreamK m a
m2 StreamK m a
forall (m :: * -> *) a. StreamK m a
nil)
        yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
interleaveMin StreamK m a
m2 StreamK m a
r)
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1

-------------------------------------------------------------------------------
-- Generation
-------------------------------------------------------------------------------

{-# INLINE unfoldr #-}
unfoldr :: (b -> Maybe (a, b)) -> b -> StreamK m a
unfoldr :: (b -> Maybe (a, b)) -> b -> StreamK m a
unfoldr b -> Maybe (a, b)
next b
s0 = (forall b. (a -> b -> b) -> b -> b) -> StreamK m a
forall (m :: * -> *) a.
(forall b. (a -> b -> b) -> b -> b) -> StreamK m a
build ((forall b. (a -> b -> b) -> b -> b) -> StreamK m a)
-> (forall b. (a -> b -> b) -> b -> b) -> StreamK m a
forall a b. (a -> b) -> a -> b
$ \a -> b -> b
yld b
stp ->
    let go :: b -> b
go b
s =
            case b -> Maybe (a, b)
next b
s of
                Just (a
a, b
b) -> a -> b -> b
yld a
a (b -> b
go b
b)
                Maybe (a, b)
Nothing -> b
stp
    in b -> b
go b
s0

{-# INLINE unfoldrMWith #-}
unfoldrMWith :: Monad m =>
       (m a -> StreamK m a -> StreamK m a)
    -> (b -> m (Maybe (a, b)))
    -> b
    -> StreamK m a
unfoldrMWith :: (m a -> StreamK m a -> StreamK m a)
-> (b -> m (Maybe (a, b))) -> b -> StreamK m a
unfoldrMWith m a -> StreamK m a -> StreamK m a
cns b -> m (Maybe (a, b))
step = b -> StreamK m a
go

    where

    go :: b -> StreamK m a
go b
s = (m a -> StreamK m a -> StreamK m a)
-> (forall r.
    (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
Monad m =>
(m a -> StreamK m a -> StreamK m a)
-> (forall r.
    (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
sharedMWith m a -> StreamK m a -> StreamK m a
cns ((forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \a -> StreamK m a -> m r
yld a -> m r
_ m r
stp -> do
                Maybe (a, b)
r <- b -> m (Maybe (a, b))
step b
s
                case Maybe (a, b)
r of
                    Just (a
a, b
b) -> a -> StreamK m a -> m r
yld a
a (b -> StreamK m a
go b
b)
                    Maybe (a, b)
Nothing -> m r
stp

{-# INLINE unfoldrM #-}
unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a
unfoldrM :: (b -> m (Maybe (a, b))) -> b -> StreamK m a
unfoldrM = (m a -> StreamK m a -> StreamK m a)
-> (b -> m (Maybe (a, b))) -> b -> StreamK m a
forall (m :: * -> *) a b.
Monad m =>
(m a -> StreamK m a -> StreamK m a)
-> (b -> m (Maybe (a, b))) -> b -> StreamK m a
unfoldrMWith m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM

-- | Generate an infinite stream by repeating a pure value.
--
-- /Pre-release/
{-# INLINE repeat #-}
repeat :: a -> StreamK m a
repeat :: a -> StreamK m a
repeat a
a = let x :: StreamK m a
x = a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
a StreamK m a
x in StreamK m a
forall (m :: * -> *). StreamK m a
x

-- | Like 'repeatM' but takes a stream 'cons' operation to combine the actions
-- in a stream specific manner. A serial cons would repeat the values serially
-- while an async cons would repeat concurrently.
--
-- /Pre-release/
repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a
repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a
repeatMWith m a -> t m a -> t m a
cns = m a -> t m a
go

    where

    go :: m a -> t m a
go m a
m = m a
m m a -> t m a -> t m a
`cns` m a -> t m a
go m a
m

{-# INLINE replicateMWith #-}
replicateMWith :: (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
replicateMWith :: (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
replicateMWith m a -> StreamK m a -> StreamK m a
cns Int
n m a
m = Int -> StreamK m a
forall t. (Ord t, Num t) => t -> StreamK m a
go Int
n

    where

    go :: t -> StreamK m a
go t
cnt = if t
cnt t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0 then StreamK m a
forall (m :: * -> *) a. StreamK m a
nil else m a
m m a -> StreamK m a -> StreamK m a
`cns` t -> StreamK m a
go (t
cnt t -> t -> t
forall a. Num a => a -> a -> a
- t
1)

{-# INLINE fromIndicesMWith #-}
fromIndicesMWith ::
    (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
fromIndicesMWith :: (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
fromIndicesMWith m a -> StreamK m a -> StreamK m a
cns Int -> m a
gen = Int -> StreamK m a
go Int
0

    where

    go :: Int -> StreamK m a
go Int
i = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
stp a -> m r
sng m r
yld -> do
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
stp a -> m r
sng m r
yld (Int -> m a
gen Int
i m a -> StreamK m a -> StreamK m a
`cns` Int -> StreamK m a
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))

{-# INLINE iterateMWith #-}
iterateMWith :: Monad m =>
    (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a
iterateMWith :: (m a -> StreamK m a -> StreamK m a)
-> (a -> m a) -> m a -> StreamK m a
iterateMWith m a -> StreamK m a -> StreamK m a
cns a -> m a
step = m a -> StreamK m a
go

    where

    go :: m a -> StreamK m a
go m a
s = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
stp a -> m r
sng m r
yld -> do
        !a
next <- m a
s
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
stp a -> m r
sng m r
yld (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
next m a -> StreamK m a -> StreamK m a
`cns` m a -> StreamK m a
go (a -> m a
step a
next))

{-# INLINE headPartial #-}
headPartial :: Monad m => StreamK m a -> m a
headPartial :: StreamK m a -> m a
headPartial = (a -> m a -> m a) -> m a -> StreamK m a -> m a
forall a (m :: * -> *) b.
(a -> m b -> m b) -> m b -> StreamK m a -> m b
foldrM (\a
x m a
_ -> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x) (String -> m a
forall a. HasCallStack => String -> a
error String
"head of nil")

{-# INLINE tailPartial #-}
tailPartial :: StreamK m a -> StreamK m a
tailPartial :: StreamK m a -> StreamK m a
tailPartial StreamK m a
m = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    let stop :: a
stop      = String -> a
forall a. HasCallStack => String -> a
error String
"tail of nil"
        single :: p -> m r
single p
_  = m r
stp
        yieldk :: p -> StreamK m a -> m r
yieldk p
_ StreamK m a
r = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
r
    in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
forall p. p -> StreamK m a -> m r
yieldk a -> m r
forall p. p -> m r
single m r
forall a. a
stop StreamK m a
m

-- | We can define cyclic structures using @let@:
--
-- >>> let (a, b) = ([1, b], head a) in (a, b)
-- ([1,1],1)
--
-- The function @fix@ defined as:
--
-- >>> fix f = let x = f x in x
--
-- ensures that the argument of a function and its output refer to the same
-- lazy value @x@ i.e.  the same location in memory.  Thus @x@ can be defined
-- in terms of itself, creating structures with cyclic references.
--
-- >>> f ~(a, b) = ([1, b], head a)
-- >>> fix f
-- ([1,1],1)
--
-- 'Control.Monad.mfix' is essentially the same as @fix@ but for monadic
-- values.
--
-- Using 'mfix' for streams we can construct a stream in which each element of
-- the stream is defined in a cyclic fashion. The argument of the function
-- being fixed represents the current element of the stream which is being
-- returned by the stream monad. Thus, we can use the argument to construct
-- itself.
--
-- In the following example, the argument @action@ of the function @f@
-- represents the tuple @(x,y)@ returned by it in a given iteration. We define
-- the first element of the tuple in terms of the second.
--
-- >>> import System.IO.Unsafe (unsafeInterleaveIO)
--
-- >>> :{
-- main = Stream.fold (Fold.drainMapM print) $ StreamK.toStream $ StreamK.mfix f
--     where
--     f action = StreamK.unCross $ do
--         let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
--         x <- StreamK.mkCross $ StreamK.fromStream $ Stream.sequence $ Stream.fromList [incr 1 action, incr 2 action]
--         y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [4,5]
--         return (x, y)
-- :}
--
-- Note: you cannot achieve this by just changing the order of the monad
-- statements because that would change the order in which the stream elements
-- are generated.
--
-- Note that the function @f@ must be lazy in its argument, that's why we use
-- 'unsafeInterleaveIO' on @action@ because IO monad is strict.
--
-- /Pre-release/
{-# INLINE mfix #-}
mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a
mfix :: (m a -> StreamK m a) -> StreamK m a
mfix m a -> StreamK m a
f = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    let single :: a -> m r
single a
a  = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ a
a a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
ys
        yieldk :: a -> p -> m r
yieldk a
a p
_ = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ a
a a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
ys
    in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
forall p. a -> p -> m r
yieldk a -> m r
single m r
stp StreamK m a
xs

    where

    -- fix the head element of the stream
    xs :: StreamK m a
xs = (StreamK m a -> StreamK m a) -> StreamK m a
forall a. (a -> a) -> a
fix  (m a -> StreamK m a
f (m a -> StreamK m a)
-> (StreamK m a -> m a) -> StreamK m a -> StreamK m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamK m a -> m a
forall (m :: * -> *) a. Monad m => StreamK m a -> m a
headPartial)

    -- now fix the tail recursively
    ys :: StreamK m a
ys = (m a -> StreamK m a) -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
(m a -> StreamK m a) -> StreamK m a
mfix (StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a
tailPartial (StreamK m a -> StreamK m a)
-> (m a -> StreamK m a) -> m a -> StreamK m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> StreamK m a
f)

-------------------------------------------------------------------------------
-- Conversions
-------------------------------------------------------------------------------

-- |
-- >>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
--
-- Construct a stream from a 'Foldable' containing pure values:
--
{-# INLINE fromFoldable #-}
fromFoldable :: Foldable f => f a -> StreamK m a
fromFoldable :: f a -> StreamK m a
fromFoldable = (a -> StreamK m a -> StreamK m a)
-> StreamK m a -> f a -> StreamK m a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons StreamK m a
forall (m :: * -> *) a. StreamK m a
nil

{-# INLINE fromFoldableM #-}
fromFoldableM :: (Foldable f, Monad m) => f (m a) -> StreamK m a
fromFoldableM :: f (m a) -> StreamK m a
fromFoldableM = (m a -> StreamK m a -> StreamK m a)
-> StreamK m a -> f (m a) -> StreamK m a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM StreamK m a
forall (m :: * -> *) a. StreamK m a
nil

-------------------------------------------------------------------------------
-- Deconstruction
-------------------------------------------------------------------------------

{-# INLINE uncons #-}
uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a))
uncons :: StreamK m a -> m (Maybe (a, StreamK m a))
uncons StreamK m a
m =
    let stop :: m (Maybe a)
stop = Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
        single :: a -> f (Maybe (a, StreamK m a))
single a
a = Maybe (a, StreamK m a) -> f (Maybe (a, StreamK m a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((a, StreamK m a) -> Maybe (a, StreamK m a)
forall a. a -> Maybe a
Just (a
a, StreamK m a
forall (m :: * -> *) a. StreamK m a
nil))
        yieldk :: a -> b -> f (Maybe (a, b))
yieldk a
a b
r = Maybe (a, b) -> f (Maybe (a, b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just (a
a, b
r))
    in State StreamK m a
-> (a -> StreamK m a -> m (Maybe (a, StreamK m a)))
-> (a -> m (Maybe (a, StreamK m a)))
-> m (Maybe (a, StreamK m a))
-> StreamK m a
-> m (Maybe (a, StreamK m a))
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe (a, StreamK m a))
forall (f :: * -> *) a b.
Applicative f =>
a -> b -> f (Maybe (a, b))
yieldk a -> m (Maybe (a, StreamK m a))
forall (f :: * -> *) a (m :: * -> *) a.
Applicative f =>
a -> f (Maybe (a, StreamK m a))
single m (Maybe (a, StreamK m a))
forall a. m (Maybe a)
stop StreamK m a
m

{-# INLINE tail #-}
tail :: Applicative m => StreamK m a -> m (Maybe (StreamK m a))
tail :: StreamK m a -> m (Maybe (StreamK m a))
tail =
    let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
        single :: p -> f (Maybe (StreamK m a))
single p
_  = Maybe (StreamK m a) -> f (Maybe (StreamK m a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (StreamK m a) -> f (Maybe (StreamK m a)))
-> Maybe (StreamK m a) -> f (Maybe (StreamK m a))
forall a b. (a -> b) -> a -> b
$ StreamK m a -> Maybe (StreamK m a)
forall a. a -> Maybe a
Just StreamK m a
forall (m :: * -> *) a. StreamK m a
nil
        yieldk :: p -> a -> f (Maybe a)
yieldk p
_ a
r = Maybe a -> f (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> f (Maybe a)) -> Maybe a -> f (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
r
    in State StreamK m a
-> (a -> StreamK m a -> m (Maybe (StreamK m a)))
-> (a -> m (Maybe (StreamK m a)))
-> m (Maybe (StreamK m a))
-> StreamK m a
-> m (Maybe (StreamK m a))
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe (StreamK m a))
forall (f :: * -> *) p a. Applicative f => p -> a -> f (Maybe a)
yieldk a -> m (Maybe (StreamK m a))
forall (f :: * -> *) p (m :: * -> *) a.
Applicative f =>
p -> f (Maybe (StreamK m a))
single m (Maybe (StreamK m a))
forall a. m (Maybe a)
stop

-- | Extract all but the last element of the stream, if any.
--
-- Note: This will end up buffering the entire stream.
--
-- /Pre-release/
{-# INLINE init #-}
init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a))
init :: StreamK m a -> m (Maybe (StreamK m a))
init = StreamK m a -> m (Maybe (StreamK m a))
forall (m :: * -> *) p.
Applicative m =>
StreamK m p -> m (Maybe (StreamK m p))
go1
    where
    go1 :: StreamK m p -> m (Maybe (StreamK m p))
go1 StreamK m p
m1 = do
        (\case
            Maybe (p, StreamK m p)
Nothing -> Maybe (StreamK m p)
forall a. Maybe a
Nothing
            Just (p
h, StreamK m p
t) -> StreamK m p -> Maybe (StreamK m p)
forall a. a -> Maybe a
Just (StreamK m p -> Maybe (StreamK m p))
-> StreamK m p -> Maybe (StreamK m p)
forall a b. (a -> b) -> a -> b
$ p -> StreamK m p -> StreamK m p
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
go p
h StreamK m p
t) (Maybe (p, StreamK m p) -> Maybe (StreamK m p))
-> m (Maybe (p, StreamK m p)) -> m (Maybe (StreamK m p))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamK m p -> m (Maybe (p, StreamK m p))
forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
uncons StreamK m p
m1
    go :: p -> StreamK m p -> StreamK m p
go p
p StreamK m p
m1 = (forall r.
 State StreamK m p
 -> (p -> StreamK m p -> m r) -> (p -> m r) -> m r -> m r)
-> StreamK m p
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m p
  -> (p -> StreamK m p -> m r) -> (p -> m r) -> m r -> m r)
 -> StreamK m p)
-> (forall r.
    State StreamK m p
    -> (p -> StreamK m p -> m r) -> (p -> m r) -> m r -> m r)
-> StreamK m p
forall a b. (a -> b) -> a -> b
$ \State StreamK m p
_ p -> StreamK m p -> m r
yld p -> m r
sng m r
stp ->
        let single :: p -> m r
single p
_ = p -> m r
sng p
p
            yieldk :: p -> StreamK m p -> m r
yieldk p
a StreamK m p
x = p -> StreamK m p -> m r
yld p
p (StreamK m p -> m r) -> StreamK m p -> m r
forall a b. (a -> b) -> a -> b
$ p -> StreamK m p -> StreamK m p
go p
a StreamK m p
x
         in State StreamK m p
-> (p -> StreamK m p -> m r)
-> (p -> m r)
-> m r
-> StreamK m p
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState p -> StreamK m p -> m r
yieldk p -> m r
forall p. p -> m r
single m r
stp StreamK m p
m1

------------------------------------------------------------------------------
-- Reordering
------------------------------------------------------------------------------

-- | Lazy left fold to a stream.
{-# INLINE foldlS #-}
foldlS ::
    (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
foldlS :: (StreamK m b -> a -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldlS StreamK m b -> a -> StreamK m b
step = StreamK m b -> StreamK m a -> StreamK m b
go
    where
    go :: StreamK m b -> StreamK m a -> StreamK m b
go StreamK m b
acc StreamK m a
rest = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
        let run :: StreamK m b -> m r
run StreamK m b
x = State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp StreamK m b
x
            stop :: m r
stop = StreamK m b -> m r
run StreamK m b
acc
            single :: a -> m r
single a
a = StreamK m b -> m r
run (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> a -> StreamK m b
step StreamK m b
acc a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m b -> m r
run (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m b -> StreamK m a -> StreamK m b
go (StreamK m b -> a -> StreamK m b
step StreamK m b
acc a
a) StreamK m a
r
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
rest

{-# INLINE reverse #-}
reverse :: StreamK m a -> StreamK m a
reverse :: StreamK m a -> StreamK m a
reverse = (StreamK m a -> a -> StreamK m a)
-> StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) b a.
(StreamK m b -> a -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
foldlS ((a -> StreamK m a -> StreamK m a)
-> StreamK m a -> a -> StreamK m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons) StreamK m a
forall (m :: * -> *) a. StreamK m a
nil

------------------------------------------------------------------------------
-- Running effects
------------------------------------------------------------------------------

-- | Run an action before evaluating the stream.
{-# INLINE before #-}
before :: Monad m => m b -> StreamK m a -> StreamK m a
before :: m b -> StreamK m a -> StreamK m a
before m b
action StreamK m a
stream =
    (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        m b
action m b -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
stream

-- | concat . fromEffect
{-# INLINE concatEffect #-}
concatEffect :: Monad m => m (StreamK m a) -> StreamK m a
concatEffect :: m (StreamK m a) -> StreamK m a
concatEffect m (StreamK m a)
action =
    (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        m (StreamK m a)
action m (StreamK m a) -> (StreamK m a -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp

{-# INLINE concatMapEffect #-}
concatMapEffect :: Monad m => (b -> StreamK m a) -> m b -> StreamK m a
concatMapEffect :: (b -> StreamK m a) -> m b -> StreamK m a
concatMapEffect b -> StreamK m a
f m b
action =
    (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        m b
action m b -> (b -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> (b -> StreamK m a) -> b -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> StreamK m a
f

------------------------------------------------------------------------------
-- Stream with a cross product style monad instance
------------------------------------------------------------------------------

-- | A newtype wrapper for the 'StreamK' type adding a cross product style
-- monad instance.
--
-- A 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do
--     x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2]
--     -- Perform the following actions for each x in the stream
--     return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like nested @for@ loops:
--
-- >>> :{
-- Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do
--     x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2]
--     y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [3,4]
--     -- Perform the following actions for each x, for each y
--     return (x, y)
-- :}
-- [(1,3),(1,4),(2,3),(2,4)]
--
newtype CrossStreamK m a = CrossStreamK {CrossStreamK m a -> StreamK m a
unCrossStreamK :: StreamK m a}
        deriving (a -> CrossStreamK m b -> CrossStreamK m a
(a -> b) -> CrossStreamK m a -> CrossStreamK m b
(forall a b. (a -> b) -> CrossStreamK m a -> CrossStreamK m b)
-> (forall a b. a -> CrossStreamK m b -> CrossStreamK m a)
-> Functor (CrossStreamK m)
forall a b. a -> CrossStreamK m b -> CrossStreamK m a
forall a b. (a -> b) -> CrossStreamK m a -> CrossStreamK m b
forall (m :: * -> *) a b.
Monad m =>
a -> CrossStreamK m b -> CrossStreamK m a
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> CrossStreamK m a -> CrossStreamK m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> CrossStreamK m b -> CrossStreamK m a
$c<$ :: forall (m :: * -> *) a b.
Monad m =>
a -> CrossStreamK m b -> CrossStreamK m a
fmap :: (a -> b) -> CrossStreamK m a -> CrossStreamK m b
$cfmap :: forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> CrossStreamK m a -> CrossStreamK m b
Functor, b -> CrossStreamK m a -> CrossStreamK m a
NonEmpty (CrossStreamK m a) -> CrossStreamK m a
CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
(CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a)
-> (NonEmpty (CrossStreamK m a) -> CrossStreamK m a)
-> (forall b.
    Integral b =>
    b -> CrossStreamK m a -> CrossStreamK m a)
-> Semigroup (CrossStreamK m a)
forall b. Integral b => b -> CrossStreamK m a -> CrossStreamK m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a.
NonEmpty (CrossStreamK m a) -> CrossStreamK m a
forall (m :: * -> *) a.
CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
forall (m :: * -> *) a b.
Integral b =>
b -> CrossStreamK m a -> CrossStreamK m a
stimes :: b -> CrossStreamK m a -> CrossStreamK m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> CrossStreamK m a -> CrossStreamK m a
sconcat :: NonEmpty (CrossStreamK m a) -> CrossStreamK m a
$csconcat :: forall (m :: * -> *) a.
NonEmpty (CrossStreamK m a) -> CrossStreamK m a
<> :: CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
$c<> :: forall (m :: * -> *) a.
CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
Semigroup, Semigroup (CrossStreamK m a)
CrossStreamK m a
Semigroup (CrossStreamK m a)
-> CrossStreamK m a
-> (CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a)
-> ([CrossStreamK m a] -> CrossStreamK m a)
-> Monoid (CrossStreamK m a)
[CrossStreamK m a] -> CrossStreamK m a
CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (CrossStreamK m a)
forall (m :: * -> *) a. CrossStreamK m a
forall (m :: * -> *) a. [CrossStreamK m a] -> CrossStreamK m a
forall (m :: * -> *) a.
CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
mconcat :: [CrossStreamK m a] -> CrossStreamK m a
$cmconcat :: forall (m :: * -> *) a. [CrossStreamK m a] -> CrossStreamK m a
mappend :: CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
$cmappend :: forall (m :: * -> *) a.
CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a
mempty :: CrossStreamK m a
$cmempty :: forall (m :: * -> *) a. CrossStreamK m a
$cp1Monoid :: forall (m :: * -> *) a. Semigroup (CrossStreamK m a)
Monoid, a -> CrossStreamK m a -> Bool
CrossStreamK m m -> m
CrossStreamK m a -> [a]
CrossStreamK m a -> Bool
CrossStreamK m a -> Int
CrossStreamK m a -> a
CrossStreamK m a -> a
CrossStreamK m a -> a
CrossStreamK m a -> a
(a -> m) -> CrossStreamK m a -> m
(a -> m) -> CrossStreamK m a -> m
(a -> b -> b) -> b -> CrossStreamK m a -> b
(a -> b -> b) -> b -> CrossStreamK m a -> b
(b -> a -> b) -> b -> CrossStreamK m a -> b
(b -> a -> b) -> b -> CrossStreamK m a -> b
(a -> a -> a) -> CrossStreamK m a -> a
(a -> a -> a) -> CrossStreamK m a -> a
(forall m. Monoid m => CrossStreamK m m -> m)
-> (forall m a. Monoid m => (a -> m) -> CrossStreamK m a -> m)
-> (forall m a. Monoid m => (a -> m) -> CrossStreamK m a -> m)
-> (forall a b. (a -> b -> b) -> b -> CrossStreamK m a -> b)
-> (forall a b. (a -> b -> b) -> b -> CrossStreamK m a -> b)
-> (forall b a. (b -> a -> b) -> b -> CrossStreamK m a -> b)
-> (forall b a. (b -> a -> b) -> b -> CrossStreamK m a -> b)
-> (forall a. (a -> a -> a) -> CrossStreamK m a -> a)
-> (forall a. (a -> a -> a) -> CrossStreamK m a -> a)
-> (forall a. CrossStreamK m a -> [a])
-> (forall a. CrossStreamK m a -> Bool)
-> (forall a. CrossStreamK m a -> Int)
-> (forall a. Eq a => a -> CrossStreamK m a -> Bool)
-> (forall a. Ord a => CrossStreamK m a -> a)
-> (forall a. Ord a => CrossStreamK m a -> a)
-> (forall a. Num a => CrossStreamK m a -> a)
-> (forall a. Num a => CrossStreamK m a -> a)
-> Foldable (CrossStreamK m)
forall a. Eq a => a -> CrossStreamK m a -> Bool
forall a. Num a => CrossStreamK m a -> a
forall a. Ord a => CrossStreamK m a -> a
forall m. Monoid m => CrossStreamK m m -> m
forall a. CrossStreamK m a -> Bool
forall a. CrossStreamK m a -> Int
forall a. CrossStreamK m a -> [a]
forall a. (a -> a -> a) -> CrossStreamK m a -> a
forall m a. Monoid m => (a -> m) -> CrossStreamK m a -> m
forall b a. (b -> a -> b) -> b -> CrossStreamK m a -> b
forall a b. (a -> b -> b) -> b -> CrossStreamK m a -> b
forall (m :: * -> *) a.
(Foldable m, Monad m, Eq a) =>
a -> CrossStreamK m a -> Bool
forall (m :: * -> *) a.
(Foldable m, Monad m, Num a) =>
CrossStreamK m a -> a
forall (m :: * -> *) a.
(Foldable m, Monad m, Ord a) =>
CrossStreamK m a -> a
forall (m :: * -> *) m.
(Foldable m, Monad m, Monoid m) =>
CrossStreamK m m -> m
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStreamK m a -> Bool
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStreamK m a -> Int
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStreamK m a -> [a]
forall (m :: * -> *) a.
(Foldable m, Monad m) =>
(a -> a -> a) -> CrossStreamK m a -> a
forall (m :: * -> *) m a.
(Foldable m, Monad m, Monoid m) =>
(a -> m) -> CrossStreamK m a -> m
forall (m :: * -> *) b a.
(Foldable m, Monad m) =>
(b -> a -> b) -> b -> CrossStreamK m a -> b
forall (m :: * -> *) a b.
(Foldable m, Monad m) =>
(a -> b -> b) -> b -> CrossStreamK m a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
product :: CrossStreamK m a -> a
$cproduct :: forall (m :: * -> *) a.
(Foldable m, Monad m, Num a) =>
CrossStreamK m a -> a
sum :: CrossStreamK m a -> a
$csum :: forall (m :: * -> *) a.
(Foldable m, Monad m, Num a) =>
CrossStreamK m a -> a
minimum :: CrossStreamK m a -> a
$cminimum :: forall (m :: * -> *) a.
(Foldable m, Monad m, Ord a) =>
CrossStreamK m a -> a
maximum :: CrossStreamK m a -> a
$cmaximum :: forall (m :: * -> *) a.
(Foldable m, Monad m, Ord a) =>
CrossStreamK m a -> a
elem :: a -> CrossStreamK m a -> Bool
$celem :: forall (m :: * -> *) a.
(Foldable m, Monad m, Eq a) =>
a -> CrossStreamK m a -> Bool
length :: CrossStreamK m a -> Int
$clength :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStreamK m a -> Int
null :: CrossStreamK m a -> Bool
$cnull :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStreamK m a -> Bool
toList :: CrossStreamK m a -> [a]
$ctoList :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
CrossStreamK m a -> [a]
foldl1 :: (a -> a -> a) -> CrossStreamK m a -> a
$cfoldl1 :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
(a -> a -> a) -> CrossStreamK m a -> a
foldr1 :: (a -> a -> a) -> CrossStreamK m a -> a
$cfoldr1 :: forall (m :: * -> *) a.
(Foldable m, Monad m) =>
(a -> a -> a) -> CrossStreamK m a -> a
foldl' :: (b -> a -> b) -> b -> CrossStreamK m a -> b
$cfoldl' :: forall (m :: * -> *) b a.
(Foldable m, Monad m) =>
(b -> a -> b) -> b -> CrossStreamK m a -> b
foldl :: (b -> a -> b) -> b -> CrossStreamK m a -> b
$cfoldl :: forall (m :: * -> *) b a.
(Foldable m, Monad m) =>
(b -> a -> b) -> b -> CrossStreamK m a -> b
foldr' :: (a -> b -> b) -> b -> CrossStreamK m a -> b
$cfoldr' :: forall (m :: * -> *) a b.
(Foldable m, Monad m) =>
(a -> b -> b) -> b -> CrossStreamK m a -> b
foldr :: (a -> b -> b) -> b -> CrossStreamK m a -> b
$cfoldr :: forall (m :: * -> *) a b.
(Foldable m, Monad m) =>
(a -> b -> b) -> b -> CrossStreamK m a -> b
foldMap' :: (a -> m) -> CrossStreamK m a -> m
$cfoldMap' :: forall (m :: * -> *) m a.
(Foldable m, Monad m, Monoid m) =>
(a -> m) -> CrossStreamK m a -> m
foldMap :: (a -> m) -> CrossStreamK m a -> m
$cfoldMap :: forall (m :: * -> *) m a.
(Foldable m, Monad m, Monoid m) =>
(a -> m) -> CrossStreamK m a -> m
fold :: CrossStreamK m m -> m
$cfold :: forall (m :: * -> *) m.
(Foldable m, Monad m, Monoid m) =>
CrossStreamK m m -> m
Foldable)

-- | Wrap the 'StreamK' type in a 'CrossStreamK' newtype to enable cross
-- product style applicative and monad instances.
--
-- This is a type level operation with no runtime overhead.
{-# INLINE mkCross #-}
mkCross :: StreamK m a -> CrossStreamK m a
mkCross :: StreamK m a -> CrossStreamK m a
mkCross = StreamK m a -> CrossStreamK m a
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK

-- | Unwrap the 'StreamK' type from 'CrossStreamK' newtype.
--
-- This is a type level operation with no runtime overhead.
{-# INLINE unCross #-}
unCross :: CrossStreamK m a -> StreamK m a
unCross :: CrossStreamK m a -> StreamK m a
unCross = CrossStreamK m a -> StreamK m a
forall (m :: * -> *) a. CrossStreamK m a -> StreamK m a
unCrossStreamK

-- Pure (Identity monad) stream instances
deriving instance Traversable (CrossStreamK Identity)
deriving instance IsList (CrossStreamK Identity a)
deriving instance (a ~ Char) => IsString (CrossStreamK Identity a)
-- deriving instance Eq a => Eq (CrossStreamK Identity a)
-- deriving instance Ord a => Ord (CrossStreamK Identity a)

-- Do not use automatic derivation for this to show as "fromList" rather than
-- "fromList Identity".
instance Show a => Show (CrossStreamK Identity a) where
    {-# INLINE show #-}
    show :: CrossStreamK Identity a -> String
show (CrossStreamK StreamK Identity a
xs) = StreamK Identity a -> String
forall a. Show a => a -> String
show StreamK Identity a
xs

instance Read a => Read (CrossStreamK Identity a) where
    {-# INLINE readPrec #-}
    readPrec :: ReadPrec (CrossStreamK Identity a)
readPrec = (StreamK Identity a -> CrossStreamK Identity a)
-> ReadPrec (StreamK Identity a)
-> ReadPrec (CrossStreamK Identity a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamK Identity a -> CrossStreamK Identity a
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK ReadPrec (StreamK Identity a)
forall a. Read a => ReadPrec a
readPrec

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

-- Note: we need to define all the typeclass operations because we want to
-- INLINE them.
instance Monad m => Applicative (CrossStreamK m) where
    {-# INLINE pure #-}
    pure :: a -> CrossStreamK m a
pure a
x = StreamK m a -> CrossStreamK m a
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)

    {-# INLINE (<*>) #-}
    (CrossStreamK StreamK m (a -> b)
s1) <*> :: CrossStreamK m (a -> b) -> CrossStreamK m a -> CrossStreamK m b
<*> (CrossStreamK StreamK m a
s2) =
        StreamK m b -> CrossStreamK m b
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK (StreamK m (a -> b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) a b.
StreamK m (a -> b) -> StreamK m a -> StreamK m b
crossApply StreamK m (a -> b)
s1 StreamK m a
s2)

    {-# INLINE liftA2 #-}
    liftA2 :: (a -> b -> c)
-> CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m c
liftA2 a -> b -> c
f CrossStreamK m a
x = CrossStreamK m (b -> c) -> CrossStreamK m b -> CrossStreamK m c
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
(<*>) ((a -> b -> c) -> CrossStreamK m a -> CrossStreamK m (b -> c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b -> c
f CrossStreamK m a
x)

    {-# INLINE (*>) #-}
    (CrossStreamK StreamK m a
s1) *> :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m b
*> (CrossStreamK StreamK m b
s2) =
        StreamK m b -> CrossStreamK m b
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK (StreamK m a -> StreamK m b -> StreamK m b
forall (m :: * -> *) a b. StreamK m a -> StreamK m b -> StreamK m b
crossApplySnd StreamK m a
s1 StreamK m b
s2)

    {-# INLINE (<*) #-}
    (CrossStreamK StreamK m a
s1) <* :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m a
<* (CrossStreamK StreamK m b
s2) =
        StreamK m a -> CrossStreamK m a
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK (StreamK m a -> StreamK m b -> StreamK m a
forall (m :: * -> *) a b. StreamK m a -> StreamK m b -> StreamK m a
crossApplyFst StreamK m a
s1 StreamK m b
s2)

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (CrossStreamK m) where
    return :: a -> CrossStreamK m a
return = a -> CrossStreamK m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    -- Benchmarks better with CPS bind and pure:
    -- Prime sieve (25x)
    -- n binds, breakAfterSome, filterAllIn, state transformer (~2x)
    --
    {-# INLINE (>>=) #-}
    >>= :: CrossStreamK m a -> (a -> CrossStreamK m b) -> CrossStreamK m b
(>>=) (CrossStreamK StreamK m a
m) a -> CrossStreamK m b
f =
        StreamK m b -> CrossStreamK m b
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK ((StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
bindWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append StreamK m a
m (CrossStreamK m b -> StreamK m b
forall (m :: * -> *) a. CrossStreamK m a -> StreamK m a
unCrossStreamK (CrossStreamK m b -> StreamK m b)
-> (a -> CrossStreamK m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> CrossStreamK m b
f))

    {-# INLINE (>>) #-}
    >> :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m b
(>>) = CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
(*>)

------------------------------------------------------------------------------
-- Transformers
------------------------------------------------------------------------------

instance (MonadIO m) => MonadIO (CrossStreamK m) where
    liftIO :: IO a -> CrossStreamK m a
liftIO IO a
x = StreamK m a -> CrossStreamK m a
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK (m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
fromEffect (m a -> StreamK m a) -> m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO a
x)

instance MonadTrans CrossStreamK where
    {-# INLINE lift #-}
    lift :: m a -> CrossStreamK m a
lift m a
x = StreamK m a -> CrossStreamK m a
forall (m :: * -> *) a. StreamK m a -> CrossStreamK m a
CrossStreamK (m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
fromEffect m a
x)

instance (MonadThrow m) => MonadThrow (CrossStreamK m) where
    throwM :: e -> CrossStreamK m a
throwM = m a -> CrossStreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> CrossStreamK m a) -> (e -> m a) -> e -> CrossStreamK m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM