module Streamly.Internal.Data.Stream.Concurrent
(
MonadAsync
, Config
, maxThreads
, maxBuffer
, eager
, StopWhen (..)
, stopWhen
, ordered
, interleaved
, Rate(..)
, rate
, avgRate
, minRate
, maxRate
, constRate
, inspect
, parEval
, parRepeatM
, parReplicateM
, parMapM
, parSequence
, parTwo
, parZipWithM
, parZipWith
, parMergeByM
, parMergeBy
, parListLazy
, parListOrdered
, parListInterleaved
, parListEager
, parListEagerFst
, parListEagerMin
, parList
, parApply
, parConcat
, parConcatMap
, parConcatIterate
, fromCallback
, tapCountD
, tapCount
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, killThread)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Control.ForkLifted (forkManaged)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (modifyThread)
import Streamly.Internal.Data.Stream.Channel.Types
( ChildEvent(..)
, concatMapDivK
)
import Streamly.Internal.Data.Stream.Channel.Worker (sendWithDoorBell)
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Streamly.Internal.Data.Stream.StreamD (Step(..))
import qualified Streamly.Internal.Data.IORef.Unboxed as Unboxed
import qualified Streamly.Internal.Data.Stream.StreamD as Stream
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Prelude hiding (mapM, sequence, concat, concatMap, zipWith)
import Streamly.Internal.Data.Stream.Concurrent.Channel
{-# INLINE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a
parEval :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
modifier Stream m a
input = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input (forall a b. a -> b -> a
const forall a. a -> a
id)
{-# INLINE _appendGeneric #-}
_appendGeneric :: MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config)
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
_appendGeneric :: forall (m :: * -> *) a.
MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
_appendGeneric (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 = forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m a)
action
where
action :: m (StreamK m a)
action = do
Channel m a
chan <- (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
done :: StreamK m a
done = forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m a
chan)
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> do
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream1
StopWhen
FirstStops -> do
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 forall {a}. StreamK m a
done)
StopWhen
AnyStops -> do
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream2 forall {a}. StreamK m a
done)
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 forall {a}. StreamK m a
done)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan
{-# INLINE appendWithK #-}
appendWithK :: MonadAsync m =>
(Config -> Config) -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
appendWithK :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 =
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier forall a. a -> a
id (StreamK m a
stream1 forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` forall a (m :: * -> *). a -> StreamK m a
K.fromPure StreamK m a
stream2)
{-# INLINE _appendWithChanK #-}
_appendWithChanK :: MonadAsync m =>
Channel m a -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
_appendWithChanK :: forall (m :: * -> *) a.
MonadAsync m =>
Channel m a -> StreamK m a -> StreamK m a -> StreamK m a
_appendWithChanK Channel m a
chan StreamK m a
stream1 StreamK m a
stream2 =
forall (m :: * -> *) b a.
Monad m =>
m b -> StreamK m a -> StreamK m a
K.before (forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2) StreamK m a
stream1
{-# INLINE parTwo #-}
parTwo :: MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo Config -> Config
modifier Stream m a
stream1 Stream m a
stream2 =
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK
Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream1) (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream2)
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
Channel m b
-> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
-> m (K.StreamK m a -> m ())
mkEnqueue :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner = do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. Monad m => a -> m a
return
forall a b. (a -> b) -> a -> b
$ let q :: StreamK m a -> m ()
q StreamK m a
stream = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m b
chan Bool
False (RunInIO m
runInIO, (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner StreamK m a -> m ()
q StreamK m a
stream)
forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch Channel m b
chan
in StreamK m a -> m ()
q
{-# INLINE parConcatMapChanK #-}
parConcatMapChanK :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanK :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanK Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q a -> StreamK m b
f
in forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q (\a
x -> forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
x) forall {a}. StreamK m a
done)
in forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q a -> StreamK m b
f
in forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect forall a b. (a -> b) -> a -> b
$ do
Maybe (a, StreamK m a)
res <- forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
K.uncons StreamK m a
stream
case Maybe (a, StreamK m a)
res of
Maybe (a, StreamK m a)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall (m :: * -> *) a. StreamK m a
K.nil
Just (a
h, StreamK m a
t) -> do
StreamK m a -> m ()
q <- forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run
StreamK m a -> m ()
q StreamK m a
t
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
h) forall {a}. StreamK m a
done
{-# INLINE parConcatMapChanKGeneric #-}
parConcatMapChanKGeneric :: MonadAsync m =>
(Config -> Config)
-> Channel m b
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
parConcatMapChanKGeneric :: forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier Channel m b
chan a -> StreamK m b
f StreamK m a
stream = do
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanK Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
FirstStops -> forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
AnyStops -> forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream
{-# INLINE parConcatMapK #-}
parConcatMapK :: MonadAsync m =>
(Config -> Config) -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier a -> StreamK m b
f StreamK m a
input =
let g :: Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
g = forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier
in forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input (forall {b} {a}.
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
`g` a -> StreamK m b
f)
{-# INLINE parConcatMap #-}
parConcatMap :: MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier a -> Stream m b
f Stream m a
stream =
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK
Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m b
f) (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream)
{-# INLINE parConcat #-}
parConcat :: MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier forall a. a -> a
id
{-# INLINE parList #-}
parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
parList :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList Config -> Config
modifier = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList
{-# INLINE parListLazy #-}
parListLazy :: MonadAsync m => [Stream m a] -> Stream m a
parListLazy :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListLazy = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList forall a. a -> a
id
{-# INLINE parListInterleaved #-}
parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
interleaved Bool
True)
{-# INLINE parListOrdered #-}
parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a
parListOrdered :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListOrdered = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
ordered Bool
True)
{-# INLINE parListEager #-}
parListEager :: MonadAsync m => [Stream m a] -> Stream m a
parListEager :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEager = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True)
{-# INLINE parListEagerFst #-}
parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
FirstStops)
{-# INLINE parListEagerMin #-}
parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
AnyStops)
{-# INLINE parApply #-}
{-# SPECIALIZE parApply ::
(Config -> Config) -> Stream IO (a -> b) -> Stream IO a -> Stream IO b #-}
parApply :: MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parApply :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parApply Config -> Config
modifier Stream m (a -> b)
stream1 Stream m a
stream2 =
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap
Config -> Config
modifier
(\a -> b
g -> forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
g) Stream m a
stream2)
Stream m (a -> b)
stream1
{-# INLINE parMapM #-}
parMapM :: MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier a -> m b
f = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
{-# INLINE parSequence #-}
parSequence :: MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
modifier = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier forall a. a -> a
id
{-# INLINE parZipWithM #-}
parZipWithM :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> m c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWithM :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg a -> b -> m c
f Stream m a
m1 Stream m b
m2 = forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWithM a -> b -> m c
f (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m1) (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m b
m2)
{-# INLINE parZipWith #-}
parZipWith :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWith :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
parZipWith Config -> Config
cfg a -> b -> c
f = forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
a b
b)
{-# INLINE parMergeByM #-}
parMergeByM :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> m Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeByM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg a -> a -> m Ordering
f Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
Stream.mergeByM a -> a -> m Ordering
f (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m1) (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m2)
{-# INLINE parMergeBy #-}
parMergeBy :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeBy :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeBy Config -> Config
cfg a -> a -> Ordering
f = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg (\a
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a -> a -> Ordering
f a
a a
b)
{-# INLINE parConcatIterate #-}
parConcatIterate :: MonadAsync m =>
(Config -> Config)
-> (a -> Stream m a)
-> Stream m a
-> Stream m a
parConcatIterate :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a
parConcatIterate Config -> Config
modifier a -> Stream m a
f Stream m a
input =
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
input) Channel m a -> StreamK m a -> StreamK m a
iterateStream
where
iterateStream :: Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
channel StreamK m a
stream =
forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier Channel m a
channel (Channel m a -> a -> StreamK m a
generate Channel m a
channel) StreamK m a
stream
generate :: Channel m a -> a -> StreamK m a
generate Channel m a
channel a
x =
a
x forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
channel (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ a -> Stream m a
f a
x)
{-# INLINE parRepeatM #-}
parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a
parRepeatM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m a -> Stream m a
parRepeatM Config -> Config
cfg = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> Stream m a
Stream.repeat
{-# INLINE parReplicateM #-}
parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a
parReplicateM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Int -> m a -> Stream m a
parReplicateM Config -> Config
cfg Int
n = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Int -> a -> Stream m a
Stream.replicate Int
n
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream = do
Channel m a
chan <- forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel (Bool -> Config -> Config
eager Bool
True)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
chan) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
let callback :: a -> m ()
callback a
a =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void
forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan) (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return (forall {m :: * -> *}. MonadIO m => a -> m ()
callback, forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan)
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
fromCallback :: forall (m :: * -> *) a.
MonadAsync m =>
((a -> m ()) -> m ()) -> Stream m a
fromCallback (a -> m ()) -> m ()
setCallback = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
(a -> m ()
callback, Stream m a
stream) <- forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream
(a -> m ()) -> m ()
setCallback a -> m ()
callback
forall (m :: * -> *) a. Monad m => a -> m a
return Stream m a
stream
{-# INLINE_NORMAL tapCountD #-}
tapCountD
:: MonadAsync m
=> (a -> Bool)
-> (D.Stream m Int -> m b)
-> D.Stream m a
-> D.Stream m a
tapCountD :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
tapCountD a -> Bool
predicate Stream m Int -> m b
fld (D.Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' forall a. Maybe a
Nothing
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' State StreamK m a
_ Maybe (IORef Int, ThreadId, s)
Nothing = do
IORef Int
countVar <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef (Int
0 :: Int)
ThreadId
tid <- forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
forkManaged
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Stream m Int -> m b
fld
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
IORef a -> Stream m a
Unboxed.toStreamD IORef Int
countVar
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state))
step' State StreamK m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
case Step s a
r of
Yield a
x s
s -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
predicate a
x)
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef Int
countVar (forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Step s a
Stop -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
{-# INLINE tapCount #-}
tapCount ::
(MonadAsync m)
=> (a -> Bool)
-> (Stream m Int -> m b)
-> Stream m a
-> Stream m a
tapCount :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
tapCount = forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
tapCountD