{-# OPTIONS_GHC -Wno-deprecations #-}
{-# LANGUAGE UndecidableInstances #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.ZipAsync
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
--
module Streamly.Internal.Data.Stream.ZipAsync
    {-# DEPRECATED "Use \"Streamly.Data.Stream.MkType\" instead." #-}
    ( ZipAsyncM(..)
    , ZipAsync
    , consMZipAsync
    , zipAsyncWithK
    , zipAsyncWithMK
    )
where

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.StreamK (Stream)

import qualified Streamly.Internal.Data.StreamK as K
    (mkStream, foldStream, zipWithM, consM)
import qualified Streamly.Internal.Data.Stream as D (fromStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream
    (fromStreamK, toStreamK)
import Streamly.Internal.Data.SVar

import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)

#include "Instances.hs"

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

------------------------------------------------------------------------------
-- Parallel Zipping
------------------------------------------------------------------------------

-- | Like 'zipAsyncWith' but with a monadic zipping function.
--
-- @since 0.4.0
{-# INLINE zipAsyncWithMK #-}
zipAsyncWithMK :: MonadAsync m
    => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 = (forall r.
 State StreamK m c
 -> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
-> StreamK m c
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m c
  -> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
 -> StreamK m c)
-> (forall r.
    State StreamK m c
    -> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
-> StreamK m c
forall a b. (a -> b) -> a -> b
$ \State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp -> do
    SVar StreamK m b
sv <- SVarStopStyle -> State StreamK m b -> m (SVar StreamK m b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State StreamK m c -> State StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st)
    State StreamK m b -> SVar StreamK m b -> Stream m b -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel (State StreamK m c -> State StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) SVar StreamK m b
sv (Stream m b -> m ()) -> Stream m b -> m ()
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m b
m2
    State StreamK m c
-> (c -> StreamK m c -> m r)
-> (c -> m r)
-> m r
-> StreamK m c
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp
        (StreamK m c -> m r) -> StreamK m c -> m r
forall a b. (a -> b) -> a -> b
$ (a -> b -> m c) -> Stream m a -> Stream m b -> StreamK m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
K.zipWithM a -> b -> m c
f Stream m a
m1 (SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m b -> SerialT m b
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m b
sv))

-- XXX Should we rename this to zipParWith or zipParallelWith? This can happen
-- along with the change of behvaior to end the stream concurrently.
--
-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped
-- are evaluated concurrently using the 'ParallelT' concurrent evaluation
-- style. The maximum number of elements of each stream evaluated in advance
-- can be controlled by 'maxBuffer'.
--
-- The stream ends if stream @a@ or stream @b@ ends. However, if stream @b@
-- ends while we are still evaluating stream @a@ and waiting for a result then
-- stream will not end until after the evaluation of stream @a@ finishes. This
-- behavior can potentially be changed in future to end the stream immediately
-- as soon as any of the stream end is detected.
--
-- @since 0.1.0
{-# INLINE zipAsyncWithK #-}
zipAsyncWithK :: MonadAsync m
    => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK a -> b -> c
f = (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK (\a
a b
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))

------------------------------------------------------------------------------
-- Parallely Zipping Streams
------------------------------------------------------------------------------
--
-- | For 'ZipAsyncM' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.serial'
-- (<*>) = 'Streamly.Prelude.serial.zipAsyncWith' id
-- @
--
-- Applicative evaluates the streams being zipped concurrently, the following
-- would take half the time that it would take in serial zipping:
--
-- >>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
-- >>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
-- ...
-- [(1,1),(1,1),(1,1)]
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype ZipAsyncM m a = ZipAsyncM {forall (m :: * -> *) a. ZipAsyncM m a -> Stream m a
getZipAsyncM :: Stream m a}
        deriving (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
(ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a)
-> (forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a)
-> Semigroup (ZipAsyncM m a)
forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
stimes :: forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
<> :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$c<> :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
Semigroup, Semigroup (ZipAsyncM m a)
ZipAsyncM m a
Semigroup (ZipAsyncM m a)
-> ZipAsyncM m a
-> (ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> ([ZipAsyncM m a] -> ZipAsyncM m a)
-> Monoid (ZipAsyncM m a)
[ZipAsyncM m a] -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
forall (m :: * -> *) a. ZipAsyncM m a
forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a
$cmconcat :: forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$cmappend :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mempty :: ZipAsyncM m a
$cmempty :: forall (m :: * -> *) a. ZipAsyncM m a
Monoid)

-- | An IO stream whose applicative instance zips streams wAsyncly.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type ZipAsync = ZipAsyncM IO

consMZipAsync :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync m a
m (ZipAsyncM Stream m a
r) = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a) -> Stream m a -> ZipAsyncM m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
K.consM m a
m Stream m a
r

instance Monad m => Functor (ZipAsyncM m) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
fmap a -> b
f (ZipAsyncM Stream m a
m) =
        Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m b -> Stream m b) -> SerialT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK Stream m a
m)

instance MonadAsync m => Applicative (ZipAsyncM m) where
    pure :: forall a. a -> ZipAsyncM m a
pure = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a)
-> (a -> Stream m a) -> a -> ZipAsyncM m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> Stream m a)
-> (a -> SerialT m a) -> a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> SerialT m a
forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat

    {-# INLINE (<*>) #-}
    ZipAsyncM Stream m (a -> b)
m1 <*> :: forall a b. ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
<*> ZipAsyncM Stream m a
m2 = Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ ((a -> b) -> a -> b)
-> Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK (a -> b) -> a -> b
forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2