{-# OPTIONS_GHC -Wno-deprecations #-}
{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.ZipAsync {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Zip.Concurrent\" instead." #-}
( ZipAsyncM(..)
, ZipAsync
, consMZipAsync
, zipAsyncWithK
, zipAsyncWithMK
)
where
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.StreamK as K
(mkStream, foldStream, zipWithM, consM)
import qualified Streamly.Internal.Data.Stream.StreamD as D (fromStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream
(fromStreamK, toStreamK)
import Streamly.Internal.Data.SVar
import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)
#include "Instances.hs"
{-# INLINE zipAsyncWithMK #-}
zipAsyncWithMK :: MonadAsync m
=> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp -> do
SVar StreamK m b
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) SVar StreamK m b
sv forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m b
m2
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
K.zipWithM a -> b -> m c
f Stream m a
m1 (forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m b
sv))
{-# INLINE zipAsyncWithK #-}
zipAsyncWithK :: MonadAsync m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK a -> b -> c
f = forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))
newtype ZipAsyncM m a = ZipAsyncM {forall (m :: * -> *) a. ZipAsyncM m a -> Stream m a
getZipAsyncM :: Stream m a}
deriving (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
stimes :: forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
<> :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$c<> :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
Semigroup, ZipAsyncM m a
[ZipAsyncM m a] -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
forall (m :: * -> *) a. ZipAsyncM m a
forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a
$cmconcat :: forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$cmappend :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mempty :: ZipAsyncM m a
$cmempty :: forall (m :: * -> *) a. ZipAsyncM m a
Monoid)
type ZipAsync = ZipAsyncM IO
consMZipAsync :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync m a
m (ZipAsyncM Stream m a
r) = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
K.consM m a
m Stream m a
r
instance Monad m => Functor (ZipAsyncM m) where
{-# INLINE fmap #-}
fmap :: forall a b. (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
fmap a -> b
f (ZipAsyncM Stream m a
m) =
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK Stream m a
m)
instance MonadAsync m => Applicative (ZipAsyncM m) where
pure :: forall a. a -> ZipAsyncM m a
pure = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat
{-# INLINE (<*>) #-}
ZipAsyncM Stream m (a -> b)
m1 <*> :: forall a b. ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
<*> ZipAsyncM Stream m a
m2 = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2