{-# OPTIONS_GHC -Wno-deprecations #-}
{-# LANGUAGE UndecidableInstances #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.ZipAsync
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
--
module Streamly.Internal.Data.Stream.ZipAsync {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Zip.Concurrent\" instead." #-}
    ( ZipAsyncM(..)
    , ZipAsync
    , consMZipAsync
    , zipAsyncWithK
    , zipAsyncWithMK
    )
where

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)

import qualified Streamly.Internal.Data.Stream.StreamK as K
    (mkStream, foldStream, zipWithM, consM)
import qualified Streamly.Internal.Data.Stream.StreamD as D (fromStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream
    (fromStreamK, toStreamK)
import Streamly.Internal.Data.SVar

import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)

#include "Instances.hs"

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

------------------------------------------------------------------------------
-- Parallel Zipping
------------------------------------------------------------------------------

-- | Like 'zipAsyncWith' but with a monadic zipping function.
--
-- @since 0.4.0
{-# INLINE zipAsyncWithMK #-}
zipAsyncWithMK :: MonadAsync m
    => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp -> do
    SVar StreamK m b
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st)
    forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) SVar StreamK m b
sv forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m b
m2
    forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp
        forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
K.zipWithM a -> b -> m c
f Stream m a
m1 (forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m b
sv))

-- XXX Should we rename this to zipParWith or zipParallelWith? This can happen
-- along with the change of behvaior to end the stream concurrently.
--
-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped
-- are evaluated concurrently using the 'ParallelT' concurrent evaluation
-- style. The maximum number of elements of each stream evaluated in advance
-- can be controlled by 'maxBuffer'.
--
-- The stream ends if stream @a@ or stream @b@ ends. However, if stream @b@
-- ends while we are still evaluating stream @a@ and waiting for a result then
-- stream will not end until after the evaluation of stream @a@ finishes. This
-- behavior can potentially be changed in future to end the stream immediately
-- as soon as any of the stream end is detected.
--
-- @since 0.1.0
{-# INLINE zipAsyncWithK #-}
zipAsyncWithK :: MonadAsync m
    => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK a -> b -> c
f = forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))

------------------------------------------------------------------------------
-- Parallely Zipping Streams
------------------------------------------------------------------------------
--
-- | For 'ZipAsyncM' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.serial'
-- (<*>) = 'Streamly.Prelude.serial.zipAsyncWith' id
-- @
--
-- Applicative evaluates the streams being zipped concurrently, the following
-- would take half the time that it would take in serial zipping:
--
-- >>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
-- >>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
-- ...
-- [(1,1),(1,1),(1,1)]
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype ZipAsyncM m a = ZipAsyncM {forall (m :: * -> *) a. ZipAsyncM m a -> Stream m a
getZipAsyncM :: Stream m a}
        deriving (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
stimes :: forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
<> :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$c<> :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
Semigroup, ZipAsyncM m a
[ZipAsyncM m a] -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
forall (m :: * -> *) a. ZipAsyncM m a
forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a
$cmconcat :: forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$cmappend :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mempty :: ZipAsyncM m a
$cmempty :: forall (m :: * -> *) a. ZipAsyncM m a
Monoid)

-- | An IO stream whose applicative instance zips streams wAsyncly.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type ZipAsync = ZipAsyncM IO

consMZipAsync :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync m a
m (ZipAsyncM Stream m a
r) = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
K.consM m a
m Stream m a
r

instance Monad m => Functor (ZipAsyncM m) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
fmap a -> b
f (ZipAsyncM Stream m a
m) =
        forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK Stream m a
m)

instance MonadAsync m => Applicative (ZipAsyncM m) where
    pure :: forall a. a -> ZipAsyncM m a
pure = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat

    {-# INLINE (<*>) #-}
    ZipAsyncM Stream m (a -> b)
m1 <*> :: forall a b. ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
<*> ZipAsyncM Stream m a
m2 = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2