{-# LANGUAGE UndecidableInstances #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Serial
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
--
module Streamly.Internal.Data.Stream.Serial
    (
    -- * Serial appending stream
      SerialT
    , Serial
    , K.serial
    , fromSerial

    -- * Serial interleaving stream
    , WSerialT
    , WSerial
    , wSerial
    , wSerialFst
    , wSerialMin
    , fromWSerial

    -- * Construction
    , unfoldrM

    -- * Transformation
    , map
    , mapM

    -- * Deprecated
    , StreamT
    , InterleavedT
    , (<=>)
    , interleaving
    )
where

import Control.Applicative (liftA2)
import Control.DeepSeq (NFData(..))
#if MIN_VERSION_deepseq(1,4,3)
import Control.DeepSeq (NFData1(..))
#endif
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (IsList(..), IsString(..))
import Text.Read
       ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
       , readListPrecDefault)
import Streamly.Internal.BaseCompat ((#.), errorWithoutStackTrace, oneShot)
import Streamly.Internal.Data.Stream.StreamK.Type
       (IsStream(..), adapt, Stream, mkStream, foldStream)
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)

import qualified Streamly.Internal.Data.Stream.Prelude as P
    (cmpBy, foldl', foldr, eqBy, fromList, toList)
import qualified Streamly.Internal.Data.Stream.StreamK as K (withLocal)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as D (unfoldrM)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D

import Prelude hiding (map, mapM, errorWithoutStackTrace)

#include "Instances.hs"
#include "inline.hs"

-- $setup
-- >>> import qualified Streamly.Prelude as Stream

------------------------------------------------------------------------------
-- SerialT
------------------------------------------------------------------------------

-- | For 'SerialT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.serial'                       -- 'Semigroup'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.serial' -- 'Monad'
-- @
--
-- A single 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- Stream.toList $ do
--      x <- Stream.fromList [1,2] -- foreach x in stream
--      return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like nested @for@ loops:
--
-- >>> :{
-- Stream.toList $ do
--     x <- Stream.fromList [1,2] -- foreach x in stream
--     y <- Stream.fromList [3,4] -- foreach y in stream
--     return (x, y)
-- :}
-- [(1,3),(1,4),(2,3),(2,4)]
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype SerialT m a = SerialT {SerialT m a -> Stream m a
getSerialT :: Stream m a}
    -- XXX when deriving do we inherit an INLINE?
    deriving (b -> SerialT m a -> SerialT m a
NonEmpty (SerialT m a) -> SerialT m a
SerialT m a -> SerialT m a -> SerialT m a
(SerialT m a -> SerialT m a -> SerialT m a)
-> (NonEmpty (SerialT m a) -> SerialT m a)
-> (forall b. Integral b => b -> SerialT m a -> SerialT m a)
-> Semigroup (SerialT m a)
forall b. Integral b => b -> SerialT m a -> SerialT m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (SerialT m a) -> SerialT m a
forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a b.
Integral b =>
b -> SerialT m a -> SerialT m a
stimes :: b -> SerialT m a -> SerialT m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> SerialT m a -> SerialT m a
sconcat :: NonEmpty (SerialT m a) -> SerialT m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (SerialT m a) -> SerialT m a
<> :: SerialT m a -> SerialT m a -> SerialT m a
$c<> :: forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
Semigroup, Semigroup (SerialT m a)
SerialT m a
Semigroup (SerialT m a)
-> SerialT m a
-> (SerialT m a -> SerialT m a -> SerialT m a)
-> ([SerialT m a] -> SerialT m a)
-> Monoid (SerialT m a)
[SerialT m a] -> SerialT m a
SerialT m a -> SerialT m a -> SerialT m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (SerialT m a)
forall (m :: * -> *) a. SerialT m a
forall (m :: * -> *) a. [SerialT m a] -> SerialT m a
forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
mconcat :: [SerialT m a] -> SerialT m a
$cmconcat :: forall (m :: * -> *) a. [SerialT m a] -> SerialT m a
mappend :: SerialT m a -> SerialT m a -> SerialT m a
$cmappend :: forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
mempty :: SerialT m a
$cmempty :: forall (m :: * -> *) a. SerialT m a
$cp1Monoid :: forall (m :: * -> *) a. Semigroup (SerialT m a)
Monoid, m a -> SerialT m a
(forall (m :: * -> *) a. Monad m => m a -> SerialT m a)
-> MonadTrans SerialT
forall (m :: * -> *) a. Monad m => m a -> SerialT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> SerialT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> SerialT m a
MonadTrans)

-- | A serial IO stream of elements of type @a@. See 'SerialT' documentation
-- for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type Serial = SerialT IO

-- |
-- @since 0.1.0
{-# DEPRECATED StreamT "Please use 'SerialT' instead." #-}
type StreamT = SerialT

-- | Fix the type of a polymorphic stream as 'SerialT'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
fromSerial :: IsStream t => SerialT m a -> t m a
fromSerial :: SerialT m a -> t m a
fromSerial = SerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

{-# INLINE consMSerial #-}
{-# SPECIALIZE consMSerial :: IO a -> SerialT IO a -> SerialT IO a #-}
consMSerial :: Monad m => m a -> SerialT m a -> SerialT m a
consMSerial :: m a -> SerialT m a -> SerialT m a
consMSerial m a
m SerialT m a
ms = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consMStream m a
m (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream SerialT m a
ms)

instance IsStream SerialT where
    toStream :: SerialT m a -> Stream m a
toStream = SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT
    fromStream :: Stream m a -> SerialT m a
fromStream = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT
    consM :: m a -> SerialT m a -> SerialT m a
consM = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
consMSerial
    |: :: m a -> SerialT m a -> SerialT m a
(|:) = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
consMSerial

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (SerialT m) where
    return :: a -> SerialT m a
return = a -> SerialT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    -- Benchmarks better with StreamD bind and pure:
    -- toList, filterAllout, *>, *<, >> (~2x)
    --
    -- pure = SerialT . D.fromStreamD . D.fromPure
    -- m >>= f = D.fromStreamD $ D.concatMap (D.toStreamD . f) (D.toStreamD m)

    -- Benchmarks better with CPS bind and pure:
    -- Prime sieve (25x)
    -- n binds, breakAfterSome, filterAllIn, state transformer (~2x)
    --
    {-# INLINE (>>=) #-}
    >>= :: SerialT m a -> (a -> SerialT m b) -> SerialT m b
(>>=) = (SerialT m b -> SerialT m b -> SerialT m b)
-> SerialT m a -> (a -> SerialT m b) -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
K.bindWith SerialT m b -> SerialT m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
K.serial

    {-# INLINE (>>) #-}
    >> :: SerialT m a -> SerialT m b -> SerialT m b
(>>)  = SerialT m a -> SerialT m b -> SerialT m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
(*>)

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

{-# INLINE mapM #-}
mapM :: (IsStream t, Monad m) => (a -> m b) -> t m a -> t m b
mapM :: (a -> m b) -> t m a -> t m b
mapM a -> m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
D.mapM a -> m b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m

-- |
-- @
-- map = fmap
-- @
--
-- Same as 'fmap'.
--
-- @
-- > S.toList $ S.map (+1) $ S.fromList [1,2,3]
-- [2,3,4]
-- @
--
-- @since 0.4.0
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map :: (a -> b) -> t m a -> t m b
map a -> b
f = (a -> m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m b) -> t m a -> t m b
mapM (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (a -> b) -> a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

{-# INLINE apSerial #-}
apSerial :: Monad m => SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial :: SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial (SerialT Stream m (a -> b)
m1) (SerialT Stream m a
m2) =
    Stream m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ Stream m (a -> b) -> Stream m (a -> b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m (a -> b)
m1 Stream m (a -> b) -> Stream m a -> Stream m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m a
m2

{-# INLINE apSequence #-}
apSequence :: Monad m => SerialT m a -> SerialT m b -> SerialT m b
apSequence :: SerialT m a -> SerialT m b -> SerialT m b
apSequence (SerialT Stream m a
m1) (SerialT Stream m b
m2) =
    Stream m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m a
m1 Stream m a -> Stream m b -> Stream m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m b
m2

{-# INLINE apDiscardSnd #-}
apDiscardSnd :: Monad m => SerialT m a -> SerialT m b -> SerialT m a
apDiscardSnd :: SerialT m a -> SerialT m b -> SerialT m a
apDiscardSnd (SerialT Stream m a
m1) (SerialT Stream m b
m2) =
    Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m a
m1 Stream m a -> Stream m b -> Stream m a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m b
m2

-- Note: we need to define all the typeclass operations because we want to
-- INLINE them.
instance Monad m => Applicative (SerialT m) where
    {-# INLINE pure #-}
    pure :: a -> SerialT m a
pure = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a)
-> (a -> Stream m a) -> a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.fromPure

    {-# INLINE (<*>) #-}
    <*> :: SerialT m (a -> b) -> SerialT m a -> SerialT m b
(<*>) = SerialT m (a -> b) -> SerialT m a -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial
    -- (<*>) = K.apSerial

#if MIN_VERSION_base(4,10,0)
    {-# INLINE liftA2 #-}
    liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c
liftA2 a -> b -> c
f SerialT m a
x = SerialT m (b -> c) -> SerialT m b -> SerialT m c
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
(<*>) ((a -> b -> c) -> SerialT m a -> SerialT m (b -> c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b -> c
f SerialT m a
x)
#endif

    {-# INLINE (*>) #-}
    *> :: SerialT m a -> SerialT m b -> SerialT m b
(*>)  = SerialT m a -> SerialT m b -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
SerialT m a -> SerialT m b -> SerialT m b
apSequence
    -- (*>)  = K.apSerialDiscardFst

    {-# INLINE (<*) #-}
    <* :: SerialT m a -> SerialT m b -> SerialT m a
(<*) = SerialT m a -> SerialT m b -> SerialT m a
forall (m :: * -> *) a b.
Monad m =>
SerialT m a -> SerialT m b -> SerialT m a
apDiscardSnd
    -- (<*)  = K.apSerialDiscardSnd

MONAD_COMMON_INSTANCES(SerialT,)
LIST_INSTANCES(SerialT)
NFDATA1_INSTANCE(SerialT)
FOLDABLE_INSTANCE(SerialT)
TRAVERSABLE_INSTANCE(SerialT)

------------------------------------------------------------------------------
-- WSerialT
------------------------------------------------------------------------------

-- | For 'WSerialT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.wSerial'                       -- 'Semigroup'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.wSerial' -- 'Monad'
-- @
--
-- Note that '<>' is associative only if we disregard the ordering of elements
-- in the resulting stream.
--
-- A single 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- Stream.toList $ Stream.fromWSerial $ do
--      x <- Stream.fromList [1,2] -- foreach x in stream
--      return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like interleaved nested @for@ loops:
--
-- >>> :{
-- Stream.toList $ Stream.fromWSerial $ do
--     x <- Stream.fromList [1,2] -- foreach x in stream
--     y <- Stream.fromList [3,4] -- foreach y in stream
--     return (x, y)
-- :}
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- It is a result of interleaving all the nested iterations corresponding to
-- element @1@ in the first stream with all the nested iterations of element
-- @2@:
--
-- >>> import Streamly.Prelude (wSerial)
-- >>> Stream.toList $ Stream.fromList [(1,3),(1,4)] `wSerial` Stream.fromList [(2,3),(2,4)]
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- The @W@ in the name stands for @wide@ or breadth wise scheduling in
-- contrast to the depth wise scheduling behavior of 'SerialT'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype WSerialT m a = WSerialT {WSerialT m a -> Stream m a
getWSerialT :: Stream m a}
    deriving (m a -> WSerialT m a
(forall (m :: * -> *) a. Monad m => m a -> WSerialT m a)
-> MonadTrans WSerialT
forall (m :: * -> *) a. Monad m => m a -> WSerialT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> WSerialT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> WSerialT m a
MonadTrans)

-- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT'
-- documentation for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type WSerial = WSerialT IO

-- |
-- @since 0.1.0
{-# DEPRECATED InterleavedT "Please use 'WSerialT' instead." #-}
type InterleavedT = WSerialT

-- | Fix the type of a polymorphic stream as 'WSerialT'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromWSerial :: IsStream t => WSerialT m a -> t m a
fromWSerial :: WSerialT m a -> t m a
fromWSerial = WSerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

-- | Same as 'fromWSerial'.
--
-- @since 0.1.0
{-# DEPRECATED interleaving "Please use fromWSerial instead." #-}
interleaving :: IsStream t => WSerialT m a -> t m a
interleaving :: WSerialT m a -> t m a
interleaving = WSerialT m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
WSerialT m a -> t m a
fromWSerial

consMWSerial :: Monad m => m a -> WSerialT m a -> WSerialT m a
consMWSerial :: m a -> WSerialT m a -> WSerialT m a
consMWSerial m a
m WSerialT m a
ms = Stream m a -> WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> WSerialT m a) -> Stream m a -> WSerialT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consMStream m a
m (WSerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WSerialT m a
ms)

instance IsStream WSerialT where
    toStream :: WSerialT m a -> Stream m a
toStream = WSerialT m a -> Stream m a
forall (m :: * -> *) a. WSerialT m a -> Stream m a
getWSerialT
    fromStream :: Stream m a -> WSerialT m a
fromStream = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    consM :: Monad m => m a -> WSerialT m a -> WSerialT m a
    consM :: m a -> WSerialT m a -> WSerialT m a
consM = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
consMWSerial

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    (|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a
    |: :: m a -> WSerialT m a -> WSerialT m a
(|:) = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
consMWSerial

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

infixr 6 `wSerial`

-- Additionally we can have m elements yield from the first stream and n
-- elements yielding from the second stream. We can also have time slicing
-- variants of positional interleaving, e.g. run first stream for m seconds and
-- run the second stream for n seconds.
--
-- Similar combinators can be implemented using WAhead style.

-- | Interleaves two streams, yielding one element from each stream
-- alternately.  When one stream stops the rest of the other stream is used in
-- the output stream.
--
-- >>> import Streamly.Prelude (wSerial)
-- >>> stream1 = Stream.fromList [1,2]
-- >>> stream2 = Stream.fromList [3,4]
-- >>> Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2
-- [1,3,2,4]
--
-- Note, for singleton streams 'wSerial' and 'serial' are identical.
--
-- Note that this operation cannot be used to fold a container of infinite
-- streams but it can be used for very large streams as the state that it needs
-- to maintain is proportional to the logarithm of the number of streams.
--
-- @since 0.8.0
--
-- /Since: 0.2.0 ("Streamly")/

-- Scheduling Notes:
--
-- Note that evaluation of @a \`wSerial` b \`wSerial` c@ does not interleave
-- @a@, @b@ and @c@ with equal priority.  This expression is equivalent to @a
-- \`wSerial` (b \`wSerial` c)@, therefore, it fairly interleaves @a@ with the
-- result of @b \`wSerial` c@.  For example, @Stream.fromList [1,2] \`wSerial`
-- Stream.fromList [3,4] \`wSerial` Stream.fromList [5,6]@ would result in
-- [1,3,2,5,4,6].  In other words, the leftmost stream gets the same scheduling
-- priority as the rest of the streams taken together. The same is true for
-- each subexpression on the right.
--
{-# INLINE wSerial #-}
wSerial :: IsStream t => t m a -> t m a -> t m a
wSerial :: t m a -> t m a -> t m a
wSerial t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    let stop :: m r
stop       = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m2
        single :: a -> m r
single a
a   = a -> t m a -> m r
yld a
a t m a
m2
        yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial t m a
m2 t m a
r)
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m1

-- | Like `wSerial` but stops interleaving as soon as the first stream stops.
--
-- @since 0.7.0
{-# INLINE wSerialFst #-}
wSerialFst :: IsStream t => t m a -> t m a -> t m a
wSerialFst :: t m a -> t m a -> t m a
wSerialFst t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    let yieldFirst :: a -> t m a -> m r
yieldFirst a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
yieldSecond t m a
r t m a
m2)
     in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldFirst a -> m r
sng m r
stp t m a
m1

    where

    yieldSecond :: t m a -> t m a -> t m a
yieldSecond t m a
s1 t m a
s2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
            let stop :: m r
stop       = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
s1
                single :: a -> m r
single a
a   = a -> t m a -> m r
yld a
a t m a
s1
                yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial t m a
s1 t m a
r)
             in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
s2

-- | Like `wSerial` but stops interleaving as soon as any of the two streams
-- stops.
--
-- @since 0.7.0
{-# INLINE wSerialMin #-}
wSerialMin :: IsStream t => t m a -> t m a -> t m a
wSerialMin :: t m a -> t m a -> t m a
wSerialMin t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
_ m r
stp -> do
    let stop :: m r
stop       = m r
stp
        -- "single a" is defined as "yld a (wSerialMin m2 K.nil)" instead of
        -- "sng a" to keep the behaviour consistent with the yield continuation.
        single :: a -> m r
single a
a   = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerialMin t m a
m2 t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil)
        yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerialMin t m a
m2 t m a
r)
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m1

instance Semigroup (WSerialT m a) where
    <> :: WSerialT m a -> WSerialT m a -> WSerialT m a
(<>) = WSerialT m a -> WSerialT m a -> WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial

infixr 5 <=>

-- | Same as 'wSerial'.
--
-- @since 0.1.0
{-# DEPRECATED (<=>) "Please use 'wSerial' instead." #-}
{-# INLINE (<=>) #-}
(<=>) :: IsStream t => t m a -> t m a -> t m a
<=> :: t m a -> t m a -> t m a
(<=>) = t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance Monoid (WSerialT m a) where
    mempty :: WSerialT m a
mempty = WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
    mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a
mappend = WSerialT m a -> WSerialT m a -> WSerialT m a
forall a. Semigroup a => a -> a -> a
(<>)

{-# INLINE apWSerial #-}
apWSerial :: Monad m => WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial (WSerialT Stream m (a -> b)
m1) (WSerialT Stream m a
m2) =
    let f :: (a -> b) -> Stream m b
f a -> b
x1 = (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
K.concatMapBy Stream m b -> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in Stream m b -> WSerialT m b
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m b -> WSerialT m b) -> Stream m b -> WSerialT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
K.concatMapBy Stream m b -> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1

instance Monad m => Applicative (WSerialT m) where
    {-# INLINE pure #-}
    pure :: a -> WSerialT m a
pure = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a)
-> (a -> Stream m a) -> a -> WSerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.fromPure
    {-# INLINE (<*>) #-}
    <*> :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
(<*>) = WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
forall (m :: * -> *) a b.
Monad m =>
WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (WSerialT m) where
    return :: a -> WSerialT m a
return = a -> WSerialT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE (>>=) #-}
    >>= :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b
(>>=) = (WSerialT m b -> WSerialT m b -> WSerialT m b)
-> WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
K.bindWith WSerialT m b -> WSerialT m b -> WSerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(WSerialT,)
LIST_INSTANCES(WSerialT)
NFDATA1_INSTANCE(WSerialT)
FOLDABLE_INSTANCE(WSerialT)
TRAVERSABLE_INSTANCE(WSerialT)

------------------------------------------------------------------------------
-- Construction
------------------------------------------------------------------------------

-- | Build a stream by unfolding a /monadic/ step function starting from a
-- seed.  The step function returns the next element in the stream and the next
-- seed value. When it is done it returns 'Nothing' and the stream ends. For
-- example,
--
-- @
-- let f b =
--         if b > 3
--         then return Nothing
--         else print b >> return (Just (b, b + 1))
-- in drain $ unfoldrM f 0
-- @
-- @
--  0
--  1
--  2
--  3
-- @
--
-- /Pre-release/
--
{-# INLINE unfoldrM #-}
unfoldrM :: (IsStream t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM :: (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM b -> m (Maybe (a, b))
step b
seed = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD ((b -> m (Maybe (a, b))) -> b -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
(s -> m (Maybe (a, s))) -> s -> Stream m a
D.unfoldrM b -> m (Maybe (a, b))
step b
seed)