{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
--
module Streamly.Internal.Data.Stream.Zip.Concurrent
    (
      ZipConcurrent (..)
    )
where

import Streamly.Internal.Data.Stream.StreamD (Stream)
import Streamly.Internal.Data.Stream.Concurrent (MonadAsync, parZipWith)

import qualified Streamly.Internal.Data.Stream.StreamD as Stream (repeat)
import Prelude hiding (map, repeat, zipWith)

-- $setup
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
--

newtype ZipConcurrent m a = ZipConcurrent {forall (m :: * -> *) a. ZipConcurrent m a -> Stream m a
getZipConcurrent :: Stream m a}
      deriving (forall a b. a -> ZipConcurrent m b -> ZipConcurrent m a
forall a b. (a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
forall (m :: * -> *) a b.
Monad m =>
a -> ZipConcurrent m b -> ZipConcurrent m a
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> ZipConcurrent m b -> ZipConcurrent m a
$c<$ :: forall (m :: * -> *) a b.
Monad m =>
a -> ZipConcurrent m b -> ZipConcurrent m a
fmap :: forall a b. (a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
$cfmap :: forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
Functor)

-- | An IO stream whose applicative instance zips streams concurrently. Note
-- that it uses the default concurrency options.
--
-- >>> s = ZipConcurrent $ Stream.fromList [1, 2, 3]
-- >>> x = (,,) <$> s <*> s <*> s
-- >>> Stream.fold Fold.toList (getZipConcurrent x)
-- [(1,1,1),(2,2,2),(3,3,3)]
--
-- @since 0.9.0

instance MonadAsync m => Applicative (ZipConcurrent m) where
    pure :: forall a. a -> ZipConcurrent m a
pure = forall (m :: * -> *) a. Stream m a -> ZipConcurrent m a
ZipConcurrent forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> Stream m a
Stream.repeat

    {-# INLINE (<*>) #-}
    ZipConcurrent Stream m (a -> b)
m1 <*> :: forall a b.
ZipConcurrent m (a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
<*> ZipConcurrent Stream m a
m2 =
        forall (m :: * -> *) a. Stream m a -> ZipConcurrent m a
ZipConcurrent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
parZipWith forall a. a -> a
id forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2