module Streamly.Internal.Data.Stream.Concurrent
(
module Streamly.Internal.Data.Stream.Concurrent.Channel
, MonadAsync
, parEval
, parRepeatM
, parReplicateM
, parMapM
, parSequence
, parTwo
, parZipWithM
, parZipWith
, parMergeByM
, parMergeBy
, parListLazy
, parListOrdered
, parListInterleaved
, parListEager
, parListEagerFst
, parListEagerMin
, parList
, parApply
, parConcat
, parConcatMap
, parConcatIterate
, fromCallback
, parTapCount
, tapCount
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, killThread)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Control.ForkLifted (forkManaged)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)
import Streamly.Internal.Data.Channel.Types (ChildEvent(..))
import Streamly.Internal.Data.Channel.Worker (sendWithDoorBell)
import Streamly.Internal.Data.Stream (Stream, Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import qualified Streamly.Internal.Data.MutArray as Unboxed
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K
import Prelude hiding (mapM, sequence, concat, concatMap, zipWith)
import Streamly.Internal.Data.Stream.Concurrent.Channel
{-# INLINE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a
parEval :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
modifier Stream m a
input = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input (forall a b. a -> b -> a
const forall a. a -> a
id)
{-# INLINE _appendGeneric #-}
_appendGeneric :: MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config)
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
_appendGeneric :: forall (m :: * -> *) a.
MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
_appendGeneric (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 = forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m a)
action
where
action :: m (StreamK m a)
action = do
Channel m a
chan <- (Config -> Config) -> m (Channel m a)
newChan Config -> Config
modifier
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
done :: StreamK m a
done = forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m a
chan)
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> do
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream1
StopWhen
FirstStops -> do
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 forall {a}. StreamK m a
done)
StopWhen
AnyStops -> do
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream2 forall {a}. StreamK m a
done)
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append StreamK m a
stream1 forall {a}. StreamK m a
done)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan
{-# INLINE appendWithK #-}
appendWithK :: MonadAsync m =>
(Config -> Config) -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
appendWithK :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK Config -> Config
modifier StreamK m a
stream1 StreamK m a
stream2 =
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier forall a. a -> a
id (StreamK m a
stream1 forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` forall a (m :: * -> *). a -> StreamK m a
K.fromPure StreamK m a
stream2)
{-# INLINE _appendWithChanK #-}
_appendWithChanK :: MonadAsync m =>
Channel m a -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
_appendWithChanK :: forall (m :: * -> *) a.
MonadAsync m =>
Channel m a -> StreamK m a -> StreamK m a -> StreamK m a
_appendWithChanK Channel m a
chan StreamK m a
stream1 StreamK m a
stream2 =
forall (m :: * -> *) b a.
Monad m =>
m b -> StreamK m a -> StreamK m a
K.before (forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
stream2) StreamK m a
stream1
{-# INLINE parTwo #-}
parTwo :: MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a -> Stream m a
parTwo Config -> Config
modifier Stream m a
stream1 Stream m a
stream2 =
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> StreamK m a -> StreamK m a -> StreamK m a
appendWithK
Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream1) (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream2)
{-# INLINE concatMapDivK #-}
concatMapDivK :: Monad m =>
(K.StreamK m a -> m ())
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
concatMapDivK :: forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
useTail a -> StreamK m b
useHead StreamK m a
stream =
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp -> do
let foldShared :: StreamK m b -> m r
foldShared = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
single :: a -> m r
single a
a = StreamK m b -> m r
foldShared forall a b. (a -> b) -> a -> b
$ a -> StreamK m b
useHead a
a
yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m a -> m ()
useTail StreamK m a
r forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m r
single a
a
in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
stream
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
Channel m b
-> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
-> m (K.StreamK m a -> m ())
mkEnqueue :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner = do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. Monad m => a -> m a
return
forall a b. (a -> b) -> a -> b
$ let q :: StreamK m a -> m ()
q StreamK m a
stream = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m b
chan Bool
False (RunInIO m
runInIO, (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
runner StreamK m a -> m ()
q StreamK m a
stream)
forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch Channel m b
chan
in StreamK m a -> m ()
q
{-# INLINE parConcatMapChanK #-}
parConcatMapChanK :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanK :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanK Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q a -> StreamK m b
f
in forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q (\a
x -> forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
x) forall {a}. StreamK m a
done)
in forall (m :: * -> *) b a.
Monad m =>
(b -> StreamK m a) -> m b -> StreamK m a
K.concatMapEffect ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b
`run` StreamK m a
stream) (forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run)
{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst :: forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream =
let done :: StreamK m a
done = forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM (forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m b
chan)
run :: (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run StreamK m a -> m ()
q = forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
q a -> StreamK m b
f
in forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect forall a b. (a -> b) -> a -> b
$ do
Maybe (a, StreamK m a)
res <- forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
K.uncons StreamK m a
stream
case Maybe (a, StreamK m a)
res of
Maybe (a, StreamK m a)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall (m :: * -> *) a. StreamK m a
K.nil
Just (a
h, StreamK m a
t) -> do
StreamK m a -> m ()
q <- forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b
-> ((StreamK m a -> m ()) -> StreamK m a -> StreamK m b)
-> m (StreamK m a -> m ())
mkEnqueue Channel m b
chan (StreamK m a -> m ()) -> StreamK m a -> StreamK m b
run
StreamK m a -> m ()
q StreamK m a
t
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
K.append (a -> StreamK m b
f a
h) forall {a}. StreamK m a
done
{-# INLINE parConcatMapChanKGeneric #-}
parConcatMapChanKGeneric :: MonadAsync m =>
(Config -> Config)
-> Channel m b
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
parConcatMapChanKGeneric :: forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier Channel m b
chan a -> StreamK m b
f StreamK m a
stream = do
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
case Config -> StopWhen
getStopWhen Config
cfg of
StopWhen
AllStop -> forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanK Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
FirstStops -> forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKFirst Channel m b
chan a -> StreamK m b
f StreamK m a
stream
StopWhen
AnyStops -> forall (m :: * -> *) b a.
MonadAsync m =>
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKAny Channel m b
chan a -> StreamK m b
f StreamK m a
stream
{-# INLINE parConcatMapK #-}
parConcatMapK :: MonadAsync m =>
(Config -> Config) -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK Config -> Config
modifier a -> StreamK m b
f StreamK m a
input =
let g :: Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
g = forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier
in forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input (forall {b} {a}.
Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
`g` a -> StreamK m b
f)
{-# INLINE parConcatMap #-}
parConcatMap :: MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier a -> Stream m b
f Stream m a
stream =
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapK
Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m b
f) (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
stream)
{-# INLINE parConcat #-}
parConcat :: MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier forall a. a -> a
id
{-# INLINE parList #-}
parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
parList :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList Config -> Config
modifier = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (Stream m a) -> Stream m a
parConcat Config -> Config
modifier forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList
{-# INLINE parListLazy #-}
parListLazy :: MonadAsync m => [Stream m a] -> Stream m a
parListLazy :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListLazy = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList forall a. a -> a
id
{-# INLINE parListInterleaved #-}
parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListInterleaved = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
interleaved Bool
True)
{-# INLINE parListOrdered #-}
parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a
parListOrdered :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListOrdered = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
ordered Bool
True)
{-# INLINE parListEager #-}
parListEager :: MonadAsync m => [Stream m a] -> Stream m a
parListEager :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEager = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True)
{-# INLINE parListEagerFst #-}
parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
FirstStops)
{-# INLINE parListEagerMin #-}
parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin :: forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerMin = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> [Stream m a] -> Stream m a
parList (Bool -> Config -> Config
eager Bool
True forall b c a. (b -> c) -> (a -> b) -> a -> c
. StopWhen -> Config -> Config
stopWhen StopWhen
AnyStops)
{-# INLINE parApply #-}
{-# SPECIALIZE parApply ::
(Config -> Config) -> Stream IO (a -> b) -> Stream IO a -> Stream IO b #-}
parApply :: MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parApply :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
parApply Config -> Config
modifier Stream m (a -> b)
stream1 Stream m a
stream2 =
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap
Config -> Config
modifier
(\a -> b
g -> forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
g) Stream m a
stream2)
Stream m (a -> b)
stream1
{-# INLINE parMapM #-}
parMapM :: MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier a -> m b
f = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
parConcatMap Config -> Config
modifier (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
{-# INLINE parSequence #-}
parSequence :: MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
modifier = forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
parMapM Config -> Config
modifier forall a. a -> a
id
{-# INLINE parZipWithM #-}
parZipWithM :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> m c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWithM :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg a -> b -> m c
f Stream m a
m1 Stream m b
m2 = forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWithM a -> b -> m c
f (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m1) (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m b
m2)
{-# INLINE parZipWith #-}
parZipWith :: MonadAsync m
=> (Config -> Config)
-> (a -> b -> c)
-> Stream m a
-> Stream m b
-> Stream m c
parZipWith :: forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
parZipWith Config -> Config
cfg a -> b -> c
f = forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
parZipWithM Config -> Config
cfg (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
a b
b)
{-# INLINE parMergeByM #-}
parMergeByM :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> m Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeByM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg a -> a -> m Ordering
f Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
Stream.mergeByM a -> a -> m Ordering
f (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m1) (forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval Config -> Config
cfg Stream m a
m2)
{-# INLINE parMergeBy #-}
parMergeBy :: MonadAsync m
=> (Config -> Config)
-> (a -> a -> Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
parMergeBy :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeBy Config -> Config
cfg a -> a -> Ordering
f = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config)
-> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
parMergeByM Config -> Config
cfg (\a
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a -> a -> Ordering
f a
a a
b)
{-# INLINE parConcatIterate #-}
parConcatIterate :: MonadAsync m =>
(Config -> Config)
-> (a -> Stream m a)
-> Stream m a
-> Stream m a
parConcatIterate :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a
parConcatIterate Config -> Config
modifier a -> Stream m a
f Stream m a
input =
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK Stream m a
input) Channel m a -> StreamK m a -> StreamK m a
iterateStream
where
iterateStream :: Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
channel StreamK m a
stream =
forall (m :: * -> *) b a.
MonadAsync m =>
(Config -> Config)
-> Channel m b -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
parConcatMapChanKGeneric Config -> Config
modifier Channel m a
channel (Channel m a -> a -> StreamK m a
generate Channel m a
channel) StreamK m a
stream
generate :: Channel m a -> a -> StreamK m a
generate Channel m a
channel a
x =
a
x forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`K.cons` Channel m a -> StreamK m a -> StreamK m a
iterateStream Channel m a
channel (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ a -> Stream m a
f a
x)
{-# INLINE parRepeatM #-}
parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a
parRepeatM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m a -> Stream m a
parRepeatM Config -> Config
cfg = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> Stream m a
Stream.repeat
{-# INLINE parReplicateM #-}
parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a
parReplicateM :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Int -> m a -> Stream m a
parReplicateM Config -> Config
cfg Int
n = forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m (m a) -> Stream m a
parSequence Config -> Config
cfg forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Int -> a -> Stream m a
Stream.replicate Int
n
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream = do
Channel m a
chan <- forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel (Bool -> Config -> Config
eager Bool
True)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
chan) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
let callback :: a -> m ()
callback a
a =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void
forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan) (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return (forall {m :: * -> *}. MonadIO m => a -> m ()
callback, forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel Channel m a
chan)
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
fromCallback :: forall (m :: * -> *) a.
MonadAsync m =>
((a -> m ()) -> m ()) -> Stream m a
fromCallback (a -> m ()) -> m ()
setCallback = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
(a -> m ()
callback, Stream m a
stream) <- forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream
(a -> m ()) -> m ()
setCallback a -> m ()
callback
forall (m :: * -> *) a. Monad m => a -> m a
return Stream m a
stream
{-# INLINE_NORMAL parTapCount #-}
parTapCount
:: MonadAsync m
=> (a -> Bool)
-> (D.Stream m Int -> m b)
-> D.Stream m a
-> D.Stream m a
parTapCount :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
parTapCount a -> Bool
predicate Stream m Int -> m b
fld (D.Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' forall a. Maybe a
Nothing
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' State StreamK m a
_ Maybe (IORef Int, ThreadId, s)
Nothing = do
IORef Int
countVar <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef (Int
0 :: Int)
ThreadId
tid <- forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
forkManaged
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Stream m Int -> m b
fld
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
IORef a -> Stream m a
Unboxed.pollIntIORef IORef Int
countVar
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state))
step' State StreamK m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
case Step s a
r of
Yield a
x s
s -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
predicate a
x)
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef Int
countVar (forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
Step s a
Stop -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
{-# DEPRECATED tapCount "Please use parTapCount instead." #-}
{-# INLINE tapCount #-}
tapCount ::
(MonadAsync m)
=> (a -> Bool)
-> (Stream m Int -> m b)
-> Stream m a
-> Stream m a
tapCount :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
tapCount = forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
parTapCount