{-# LANGUAGE UndecidableInstances #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.ZipAsync
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
--
module Streamly.Internal.Data.Stream.ZipAsync
    ( ZipAsyncM(..)
    , ZipAsync
    , consMZipAsync
    , zipAsyncWithK
    , zipAsyncWithMK
    )
where

#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)

import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import Streamly.Internal.Data.SVar

import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)

#include "Instances.hs"

-- $setup
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

------------------------------------------------------------------------------
-- Parallel Zipping
------------------------------------------------------------------------------

-- | Like 'zipAsyncWith' but with a monadic zipping function.
--
-- @since 0.4.0
{-# INLINE zipAsyncWithMK #-}
zipAsyncWithMK :: MonadAsync m
    => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK :: (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 = (forall r.
 State Stream m c
 -> (c -> Stream m c -> m r) -> (c -> m r) -> m r -> m r)
-> Stream m c
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
  State Stream m c
  -> (c -> Stream m c -> m r) -> (c -> m r) -> m r -> m r)
 -> Stream m c)
-> (forall r.
    State Stream m c
    -> (c -> Stream m c -> m r) -> (c -> m r) -> m r -> m r)
-> Stream m c
forall a b. (a -> b) -> a -> b
$ \State Stream m c
st c -> Stream m c -> m r
yld c -> m r
sng m r
stp -> do
    SVar Stream m b
sv <- SVarStopStyle -> State Stream m b -> m (SVar Stream m b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State Stream m c -> State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m c
st)
    State Stream m b -> SVar Stream m b -> Stream m b -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel (State Stream m c -> State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m c
st) SVar Stream m b
sv (Stream m b -> m ()) -> Stream m b -> m ()
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m b
m2
    State Stream m c
-> (c -> Stream m c -> m r)
-> (c -> m r)
-> m r
-> Stream m c
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m c
st c -> Stream m c -> m r
yld c -> m r
sng m r
stp (Stream m c -> m r) -> Stream m c -> m r
forall a b. (a -> b) -> a -> b
$ (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
K.zipWithM a -> b -> m c
f Stream m a
m1 (SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SVar Stream m b -> SerialT m b
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
SVar.fromSVar SVar Stream m b
sv))

-- XXX Should we rename this to zipParWith or zipParallelWith? This can happen
-- along with the change of behvaior to end the stream concurrently.
--
-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped
-- are evaluated concurrently using the 'ParallelT' concurrent evaluation
-- style. The maximum number of elements of each stream evaluated in advance
-- can be controlled by 'maxBuffer'.
--
-- The stream ends if stream @a@ or stream @b@ ends. However, if stream @b@
-- ends while we are still evaluating stream @a@ and waiting for a result then
-- stream will not end until after the evaluation of stream @a@ finishes. This
-- behavior can potentially be changed in future to end the stream immediately
-- as soon as any of the stream end is detected.
--
-- @since 0.1.0
{-# INLINE zipAsyncWithK #-}
zipAsyncWithK :: MonadAsync m
    => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK a -> b -> c
f = (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK (\a
a b
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))

------------------------------------------------------------------------------
-- Parallely Zipping Streams
------------------------------------------------------------------------------
--
-- | For 'ZipAsyncM' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.serial'
-- (<*>) = 'Streamly.Prelude.serial.zipAsyncWith' id
-- @
--
-- Applicative evaluates the streams being zipped concurrently, the following
-- would take half the time that it would take in serial zipping:
--
-- >>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
-- >>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
-- ...
-- [(1,1),(1,1),(1,1)]
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype ZipAsyncM m a = ZipAsyncM {ZipAsyncM m a -> Stream m a
getZipAsyncM :: Stream m a}
        deriving (b -> ZipAsyncM m a -> ZipAsyncM m a
NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
(ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a)
-> (forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a)
-> Semigroup (ZipAsyncM m a)
forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
stimes :: b -> ZipAsyncM m a -> ZipAsyncM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
<> :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$c<> :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
Semigroup, Semigroup (ZipAsyncM m a)
ZipAsyncM m a
Semigroup (ZipAsyncM m a)
-> ZipAsyncM m a
-> (ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> ([ZipAsyncM m a] -> ZipAsyncM m a)
-> Monoid (ZipAsyncM m a)
[ZipAsyncM m a] -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
forall (m :: * -> *) a. ZipAsyncM m a
forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a
$cmconcat :: forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$cmappend :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mempty :: ZipAsyncM m a
$cmempty :: forall (m :: * -> *) a. ZipAsyncM m a
$cp1Monoid :: forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
Monoid)

-- | An IO stream whose applicative instance zips streams wAsyncly.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type ZipAsync = ZipAsyncM IO

consMZipAsync :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync :: m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync m a
m (ZipAsyncM Stream m a
r) = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a) -> Stream m a -> ZipAsyncM m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consM m a
m Stream m a
r

instance Monad m => Functor (ZipAsyncM m) where
    {-# INLINE fmap #-}
    fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
fmap a -> b
f (ZipAsyncM Stream m a
m) = Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m b -> Stream m b) -> SerialT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT Stream m a
m)

instance MonadAsync m => Applicative (ZipAsyncM m) where
    pure :: a -> ZipAsyncM m a
pure = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a)
-> (a -> Stream m a) -> a -> ZipAsyncM m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m a -> Stream m a)
-> (a -> SerialT m a) -> a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> SerialT m a
forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat

    {-# INLINE (<*>) #-}
    ZipAsyncM Stream m (a -> b)
m1 <*> :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
<*> ZipAsyncM Stream m a
m2 = Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ ((a -> b) -> a -> b)
-> Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK (a -> b) -> a -> b
forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2