{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ConstraintKinds           #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs              #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE TypeFamilies              #-}
{-# LANGUAGE UndecidableInstances      #-} -- XXX

-- |
-- Module      : Streamly.Internal.Data.Stream.Serial
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.Serial
    (
    -- * Serial appending stream
      SerialT
    , Serial
    , K.serial
    , serially

    -- * Serial interleaving stream
    , WSerialT
    , WSerial
    , wSerial
    , wSerialFst
    , wSerialMin
    , wSerially

    -- * Construction
    , unfoldrM

    -- * Transformation
    , map
    , mapM

    -- * Deprecated
    , StreamT
    , InterleavedT
    , (<=>)
    , interleaving
    )
where

import Control.Applicative (liftA2)
import Control.DeepSeq (NFData(..))
#if MIN_VERSION_deepseq(1,4,3)
import Control.DeepSeq (NFData1(..))
#endif
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (IsList(..), IsString(..))
import Text.Read (Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec,
                  readListPrecDefault)
import Prelude hiding (map, mapM, errorWithoutStackTrace)

import Streamly.Internal.BaseCompat ((#.), errorWithoutStackTrace)
import Streamly.Internal.Data.Stream.StreamK (IsStream(..), adapt, Stream, mkStream,
                                 foldStream)
import Streamly.Internal.Data.Strict (Maybe'(..), toMaybe)
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D

#include "Instances.hs"
#include "inline.hs"

------------------------------------------------------------------------------
-- SerialT
------------------------------------------------------------------------------

-- | The 'Semigroup' operation for 'SerialT' behaves like a regular append
-- operation.  Therefore, when @a <> b@ is evaluated, stream @a@ is evaluated
-- first until it exhausts and then stream @b@ is evaluated. In other words,
-- the elements of stream @b@ are appended to the elements of stream @a@. This
-- operation can be used to fold an infinite lazy container of streams.
--
-- @
-- import Streamly
-- import qualified "Streamly.Prelude" as S
--
-- main = (S.toList . 'serially' $ (S.fromList [1,2]) \<\> (S.fromList [3,4])) >>= print
-- @
-- @
-- [1,2,3,4]
-- @
--
-- The 'Monad' instance runs the /monadic continuation/ for each
-- element of the stream, serially.
--
-- @
-- main = S.drain . 'serially' $ do
--     x <- return 1 \<\> return 2
--     S.yieldM $ print x
-- @
-- @
-- 1
-- 2
-- @
--
-- 'SerialT' nests streams serially in a depth first manner.
--
-- @
-- main = S.drain . 'serially' $ do
--     x <- return 1 \<\> return 2
--     y <- return 3 \<\> return 4
--     S.yieldM $ print (x, y)
-- @
-- @
-- (1,3)
-- (1,4)
-- (2,3)
-- (2,4)
-- @
--
-- We call the monadic code being run for each element of the stream a monadic
-- continuation. In imperative paradigm we can think of this composition as
-- nested @for@ loops and the monadic continuation is the body of the loop. The
-- loop iterates for all elements of the stream.
--
-- Note that the behavior and semantics  of 'SerialT', including 'Semigroup'
-- and 'Monad' instances are exactly like Haskell lists except that 'SerialT'
-- can contain effectful actions while lists are pure.
--
-- In the code above, the 'serially' combinator can be omitted as the default
-- stream type is 'SerialT'.
--
-- @since 0.2.0
newtype SerialT m a = SerialT {SerialT m a -> Stream m a
getSerialT :: Stream m a}
    deriving (b -> SerialT m a -> SerialT m a
NonEmpty (SerialT m a) -> SerialT m a
SerialT m a -> SerialT m a -> SerialT m a
(SerialT m a -> SerialT m a -> SerialT m a)
-> (NonEmpty (SerialT m a) -> SerialT m a)
-> (forall b. Integral b => b -> SerialT m a -> SerialT m a)
-> Semigroup (SerialT m a)
forall b. Integral b => b -> SerialT m a -> SerialT m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (SerialT m a) -> SerialT m a
forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a b.
Integral b =>
b -> SerialT m a -> SerialT m a
stimes :: b -> SerialT m a -> SerialT m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> SerialT m a -> SerialT m a
sconcat :: NonEmpty (SerialT m a) -> SerialT m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (SerialT m a) -> SerialT m a
<> :: SerialT m a -> SerialT m a -> SerialT m a
$c<> :: forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
Semigroup, Semigroup (SerialT m a)
SerialT m a
Semigroup (SerialT m a)
-> SerialT m a
-> (SerialT m a -> SerialT m a -> SerialT m a)
-> ([SerialT m a] -> SerialT m a)
-> Monoid (SerialT m a)
[SerialT m a] -> SerialT m a
SerialT m a -> SerialT m a -> SerialT m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (SerialT m a)
forall (m :: * -> *) a. SerialT m a
forall (m :: * -> *) a. [SerialT m a] -> SerialT m a
forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
mconcat :: [SerialT m a] -> SerialT m a
$cmconcat :: forall (m :: * -> *) a. [SerialT m a] -> SerialT m a
mappend :: SerialT m a -> SerialT m a -> SerialT m a
$cmappend :: forall (m :: * -> *) a. SerialT m a -> SerialT m a -> SerialT m a
mempty :: SerialT m a
$cmempty :: forall (m :: * -> *) a. SerialT m a
$cp1Monoid :: forall (m :: * -> *) a. Semigroup (SerialT m a)
Monoid, m a -> SerialT m a
(forall (m :: * -> *) a. Monad m => m a -> SerialT m a)
-> MonadTrans SerialT
forall (m :: * -> *) a. Monad m => m a -> SerialT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> SerialT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> SerialT m a
MonadTrans)

-- | A serial IO stream of elements of type @a@. See 'SerialT' documentation
-- for more details.
--
-- @since 0.2.0
type Serial = SerialT IO

-- |
-- @since 0.1.0
{-# DEPRECATED StreamT "Please use 'SerialT' instead." #-}
type StreamT = SerialT

-- | Fix the type of a polymorphic stream as 'SerialT'.
--
-- @since 0.1.0
serially :: IsStream t => SerialT m a -> t m a
serially :: SerialT m a -> t m a
serially = SerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

{-# INLINE consMSerial #-}
{-# SPECIALIZE consMSerial :: IO a -> SerialT IO a -> SerialT IO a #-}
consMSerial :: Monad m => m a -> SerialT m a -> SerialT m a
consMSerial :: m a -> SerialT m a -> SerialT m a
consMSerial m a
m SerialT m a
ms = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consMStream m a
m (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream SerialT m a
ms)

instance IsStream SerialT where
    toStream :: SerialT m a -> Stream m a
toStream = SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT
    fromStream :: Stream m a -> SerialT m a
fromStream = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT
    consM :: m a -> SerialT m a -> SerialT m a
consM = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
consMSerial
    |: :: m a -> SerialT m a -> SerialT m a
(|:) = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
consMSerial

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (SerialT m) where
    return :: a -> SerialT m a
return = a -> SerialT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE (>>=) #-}
    >>= :: SerialT m a -> (a -> SerialT m b) -> SerialT m b
(>>=) = (forall c. SerialT m c -> SerialT m c -> SerialT m c)
-> SerialT m a -> (a -> SerialT m b) -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> t m a -> (a -> t m b) -> t m b
K.bindWith forall c. SerialT m c -> SerialT m c -> SerialT m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
K.serial
    {-# INLINE (>>) #-}
    >> :: SerialT m a -> SerialT m b -> SerialT m b
(>>)  = SerialT m a -> SerialT m b -> SerialT m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
(*>)

    -- StreamD based implementation
    -- return = SerialT . D.fromStreamD . D.yield
    -- m >>= f = D.fromStreamD $ D.concatMap (\a -> D.toStreamD (f a)) (D.toStreamD m)

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

{-# INLINE mapM #-}
mapM :: (IsStream t, Monad m) => (a -> m b) -> t m a -> t m b
mapM :: (a -> m b) -> t m a -> t m b
mapM a -> m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
D.mapM a -> m b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m

-- |
-- @
-- map = fmap
-- @
--
-- Same as 'fmap'.
--
-- @
-- > S.toList $ S.map (+1) $ S.fromList [1,2,3]
-- [2,3,4]
-- @
--
-- @since 0.4.0
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map :: (a -> b) -> t m a -> t m b
map a -> b
f = (a -> m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m b) -> t m a -> t m b
mapM (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (a -> b) -> a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

{-# INLINE apSerial #-}
apSerial :: Monad m => SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial :: SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial (SerialT Stream m (a -> b)
m1) (SerialT Stream m a
m2) = Stream m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ Stream m (a -> b) -> Stream m (a -> b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m (a -> b)
m1 Stream m (a -> b) -> Stream m a -> Stream m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m a
m2

{-# INLINE apSequence #-}
apSequence :: Monad m => SerialT m a -> SerialT m b -> SerialT m b
apSequence :: SerialT m a -> SerialT m b -> SerialT m b
apSequence (SerialT Stream m a
m1) (SerialT Stream m b
m2) = Stream m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m a
m1 Stream m a -> Stream m b -> Stream m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD Stream m b
m2

instance Monad m => Applicative (SerialT m) where
    {-# INLINE pure #-}
    pure :: a -> SerialT m a
pure = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a)
-> (a -> Stream m a) -> a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
    {-# INLINE (<*>) #-}
    <*> :: SerialT m (a -> b) -> SerialT m a -> SerialT m b
(<*>) = SerialT m (a -> b) -> SerialT m a -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
SerialT m (a -> b) -> SerialT m a -> SerialT m b
apSerial
    {-# INLINE (*>) #-}
    *> :: SerialT m a -> SerialT m b -> SerialT m b
(*>)  = SerialT m a -> SerialT m b -> SerialT m b
forall (m :: * -> *) a b.
Monad m =>
SerialT m a -> SerialT m b -> SerialT m b
apSequence

MONAD_COMMON_INSTANCES(SerialT,)
LIST_INSTANCES(SerialT)
NFDATA1_INSTANCE(SerialT)
FOLDABLE_INSTANCE(SerialT)
TRAVERSABLE_INSTANCE(SerialT)

------------------------------------------------------------------------------
-- WSerialT
------------------------------------------------------------------------------

-- | The 'Semigroup' operation for 'WSerialT' interleaves the elements from the
-- two streams.  Therefore, when @a <> b@ is evaluated, stream @a@ is evaluated
-- first to produce the first element of the combined stream and then stream
-- @b@ is evaluated to produce the next element of the combined stream, and
-- then we go back to evaluating stream @a@ and so on. In other words, the
-- elements of stream @a@ are interleaved with the elements of stream @b@.
--
-- Note that evaluation of @a <> b <> c@ does not schedule @a@, @b@ and @c@
-- with equal priority.  This expression is equivalent to @a <> (b <> c)@,
-- therefore, it fairly interleaves @a@ with the result of @b <> c@.  For
-- example, @S.fromList [1,2] <> S.fromList [3,4] <> S.fromList [5,6] ::
-- WSerialT Identity Int@ would result in [1,3,2,5,4,6].  In other words, the
-- leftmost stream gets the same scheduling priority as the rest of the
-- streams taken together. The same is true for each subexpression on the right.
--
-- Note that this operation cannot be used to fold a container of infinite
-- streams as the state that it needs to maintain is proportional to the number
-- of streams.
--
-- The @W@ in the name stands for @wide@ or breadth wise scheduling in
-- contrast to the depth wise scheduling behavior of 'SerialT'.
--
-- @
-- import Streamly
-- import qualified "Streamly.Prelude" as S
--
-- main = (S.toList . 'wSerially' $ (S.fromList [1,2]) \<\> (S.fromList [3,4])) >>= print
-- @
-- @
-- [1,3,2,4]
-- @
--
-- Similarly, the 'Monad' instance interleaves the iterations of the
-- inner and the outer loop, nesting loops in a breadth first manner.
--
--
-- @
-- main = S.drain . 'wSerially' $ do
--     x <- return 1 \<\> return 2
--     y <- return 3 \<\> return 4
--     S.yieldM $ print (x, y)
-- @
-- @
-- (1,3)
-- (2,3)
-- (1,4)
-- (2,4)
-- @
--
-- @since 0.2.0
newtype WSerialT m a = WSerialT {WSerialT m a -> Stream m a
getWSerialT :: Stream m a}
    deriving (m a -> WSerialT m a
(forall (m :: * -> *) a. Monad m => m a -> WSerialT m a)
-> MonadTrans WSerialT
forall (m :: * -> *) a. Monad m => m a -> WSerialT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> WSerialT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> WSerialT m a
MonadTrans)

-- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT'
-- documentation for more details.
--
-- @since 0.2.0
type WSerial = WSerialT IO

-- |
-- @since 0.1.0
{-# DEPRECATED InterleavedT "Please use 'WSerialT' instead." #-}
type InterleavedT = WSerialT

-- | Fix the type of a polymorphic stream as 'WSerialT'.
--
-- @since 0.2.0
wSerially :: IsStream t => WSerialT m a -> t m a
wSerially :: WSerialT m a -> t m a
wSerially = WSerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

-- | Same as 'wSerially'.
--
-- @since 0.1.0
{-# DEPRECATED interleaving "Please use wSerially instead." #-}
interleaving :: IsStream t => WSerialT m a -> t m a
interleaving :: WSerialT m a -> t m a
interleaving = WSerialT m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
WSerialT m a -> t m a
wSerially

consMWSerial :: Monad m => m a -> WSerialT m a -> WSerialT m a
consMWSerial :: m a -> WSerialT m a -> WSerialT m a
consMWSerial m a
m WSerialT m a
ms = Stream m a -> WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> WSerialT m a) -> Stream m a -> WSerialT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consMStream m a
m (WSerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WSerialT m a
ms)

instance IsStream WSerialT where
    toStream :: WSerialT m a -> Stream m a
toStream = WSerialT m a -> Stream m a
forall (m :: * -> *) a. WSerialT m a -> Stream m a
getWSerialT
    fromStream :: Stream m a -> WSerialT m a
fromStream = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    consM :: Monad m => m a -> WSerialT m a -> WSerialT m a
    consM :: m a -> WSerialT m a -> WSerialT m a
consM = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
consMWSerial

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    (|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a
    |: :: m a -> WSerialT m a -> WSerialT m a
(|:) = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
consMWSerial

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

-- Additionally we can have m elements yield from the first stream and n
-- elements yielding from the second stream. We can also have time slicing
-- variants of positional interleaving, e.g. run first stream for m seconds and
-- run the second stream for n seconds.
--
-- Similar combinators can be implemented using WAhead style.

-- | Polymorphic version of the 'Semigroup' operation '<>' of 'WSerialT'.
-- Interleaves two streams, yielding one element from each stream alternately.
-- When one stream stops the rest of the other stream is used in the output
-- stream.
--
-- @since 0.2.0
{-# INLINE wSerial #-}
wSerial :: IsStream t => t m a -> t m a -> t m a
wSerial :: t m a -> t m a -> t m a
wSerial t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    let stop :: m r
stop       = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m2
        single :: a -> m r
single a
a   = a -> t m a -> m r
yld a
a t m a
m2
        yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial t m a
m2 t m a
r)
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m1

-- | Like `wSerial` but stops interleaving as soon as the first stream stops.
--
-- @since 0.7.0
{-# INLINE wSerialFst #-}
wSerialFst :: IsStream t => t m a -> t m a -> t m a
wSerialFst :: t m a -> t m a -> t m a
wSerialFst t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    let yieldFirst :: a -> t m a -> m r
yieldFirst a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
yieldSecond t m a
r t m a
m2)
     in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldFirst a -> m r
sng m r
stp t m a
m1

    where

    yieldSecond :: t m a -> t m a -> t m a
yieldSecond t m a
s1 t m a
s2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
            let stop :: m r
stop       = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
s1
                single :: a -> m r
single a
a   = a -> t m a -> m r
yld a
a t m a
s1
                yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial t m a
s1 t m a
r)
             in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
s2

-- | Like `wSerial` but stops interleaving as soon as any of the two streams
-- stops.
--
-- @since 0.7.0
{-# INLINE wSerialMin #-}
wSerialMin :: IsStream t => t m a -> t m a -> t m a
wSerialMin :: t m a -> t m a -> t m a
wSerialMin t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    let stop :: m r
stop       = m r
stp
        single :: a -> m r
single a
a   = a -> m r
sng a
a
        yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial t m a
m2 t m a
r)
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m1

instance Semigroup (WSerialT m a) where
    <> :: WSerialT m a -> WSerialT m a -> WSerialT m a
(<>) = WSerialT m a -> WSerialT m a -> WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial

infixr 5 <=>

-- | Same as 'wSerial'.
--
-- @since 0.1.0
{-# DEPRECATED (<=>) "Please use 'wSerial' instead." #-}
{-# INLINE (<=>) #-}
(<=>) :: IsStream t => t m a -> t m a -> t m a
<=> :: t m a -> t m a -> t m a
(<=>) = t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance Monoid (WSerialT m a) where
    mempty :: WSerialT m a
mempty = WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
    mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a
mappend = WSerialT m a -> WSerialT m a -> WSerialT m a
forall a. Semigroup a => a -> a -> a
(<>)

{-# INLINE apWSerial #-}
apWSerial :: Monad m => WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial (WSerialT Stream m (a -> b)
m1) (WSerialT Stream m a
m2) =
    let f :: (a -> b) -> Stream m b
f a -> b
x1 = (forall c. Stream m c -> Stream m c -> Stream m c)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in Stream m b -> WSerialT m b
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m b -> WSerialT m b) -> Stream m b -> WSerialT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial (a -> b) -> Stream m b
f Stream m (a -> b)
m1

instance Monad m => Applicative (WSerialT m) where
    {-# INLINE pure #-}
    pure :: a -> WSerialT m a
pure = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT (Stream m a -> WSerialT m a)
-> (a -> Stream m a) -> a -> WSerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
    {-# INLINE (<*>) #-}
    <*> :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
(<*>) = WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
forall (m :: * -> *) a b.
Monad m =>
WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

instance Monad m => Monad (WSerialT m) where
    return :: a -> WSerialT m a
return = a -> WSerialT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE (>>=) #-}
    >>= :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b
(>>=) = (forall c. WSerialT m c -> WSerialT m c -> WSerialT m c)
-> WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> t m a -> (a -> t m b) -> t m b
K.bindWith forall c. WSerialT m c -> WSerialT m c -> WSerialT m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
wSerial

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(WSerialT,)
LIST_INSTANCES(WSerialT)
NFDATA1_INSTANCE(WSerialT)
FOLDABLE_INSTANCE(WSerialT)
TRAVERSABLE_INSTANCE(WSerialT)

------------------------------------------------------------------------------
-- Construction
------------------------------------------------------------------------------

-- | Build a stream by unfolding a /monadic/ step function starting from a
-- seed.  The step function returns the next element in the stream and the next
-- seed value. When it is done it returns 'Nothing' and the stream ends. For
-- example,
--
-- @
-- let f b =
--         if b > 3
--         then return Nothing
--         else print b >> return (Just (b, b + 1))
-- in drain $ unfoldrM f 0
-- @
-- @
--  0
--  1
--  2
--  3
-- @
--
-- /Internal/
--
{-# INLINE unfoldrM #-}
unfoldrM :: (IsStream t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM :: (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM b -> m (Maybe (a, b))
step b
seed = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD ((b -> m (Maybe (a, b))) -> b -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
(s -> m (Maybe (a, s))) -> s -> Stream m a
D.unfoldrM b -> m (Maybe (a, b))
step b
seed)