{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
module Streamly.Internal.Data.Stream.Parallel {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" instead." #-}
(
ParallelT(..)
, Parallel
, consM
, parallelK
, parallelFstK
, parallelMinK
, mkParallelD
, mkParallelK
, tapAsyncK
, tapAsyncF
, newCallbackStream
)
where
import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(StreamK, foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "inline.hs"
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> K.StreamK m a -> K.StreamK m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
m =
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> m r
yld a
a (forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> StreamK m a -> StreamK m a
withLocal r -> r
f StreamK m a
r)
in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single (forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) StreamK m a
m
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOne :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> StreamK m a -> m ()
go StreamK m a
m0
Just Count
_ -> forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo
where
go :: StreamK m a -> m ()
go StreamK m a
m = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
sv :: SVar StreamK m a
sv = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
runOneLimited
:: MonadIO m
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited :: forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited State StreamK m a
st StreamK m a
m0 Maybe WorkerInfo
winfo = StreamK m a -> m ()
go StreamK m a
m0
where
go :: StreamK m a -> m ()
go StreamK m a
m = do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar StreamK m a
sv
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m ()
yieldk forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop StreamK m a
m
else do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar StreamK m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sv :: SVar StreamK m a
sv = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar StreamK m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
=> SVarStopStyle -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
forkSVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m StreamK m a
r = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State StreamK m a
st
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
m)
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Set ThreadId
set <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar StreamK m a
sv)
forall a. IORef a -> a -> IO ()
writeIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar StreamK m a
sv) forall a b. (a -> b) -> a -> b
$ forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
SVarStopStyle
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = forall a. a -> Maybe a
Just SVar StreamK m a
sv} StreamK m a
r)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar ::
MonadAsync m
=> SVarStyle
-> SVarStopStyle
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
joinStreamVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2 = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar StreamK m a
sv forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar StreamK m a
sv (forall (m :: * -> *) a.
MonadIO m =>
State StreamK m a -> StreamK m a -> Maybe WorkerInfo -> m ()
runOne State StreamK m a
st StreamK m a
m1)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
Maybe (SVar StreamK m a)
_ ->
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
forkSVarPar SVarStopStyle
ss StreamK m a
m1 StreamK m a
m2)
{-# INLINE parallelK #-}
parallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consM m a
m (ParallelT StreamK m a
r) = forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) StreamK m a
r
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelFstK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelFstK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelMinK :: forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelMinK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> StreamK m a -> StreamK m a -> StreamK m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny
mkParallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a
mkParallelK :: forall (m :: * -> *) a. MonadAsync m => StreamK m a -> StreamK m a
mkParallelK StreamK m a
m = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
st SVar StreamK m a
sv forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK StreamK m a
m
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m a
sv
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelD Stream m a
m = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step forall a. Maybe a
Nothing
where
step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
SVar StreamK m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State StreamK m a
gst
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State StreamK m a
gst SVar StreamK m a
sv Stream m a
m
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar StreamK m a
sv
step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> forall s a. a -> s -> Step s a
Yield a
a (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Skip s
s -> forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Step s a
Stop -> forall s a. Step s a
Stop
{-# INLINE tapAsyncK #-}
tapAsyncK ::
MonadAsync m => (K.StreamK m a -> m b) -> K.StreamK m a -> K.StreamK m a
tapAsyncK :: forall (m :: * -> *) a b.
MonadAsync m =>
(StreamK m a -> m b) -> StreamK m a -> StreamK m a
tapAsyncK StreamK m a -> m b
f StreamK m a
m = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- forall (m :: * -> *) a b.
MonadAsync m =>
State StreamK m a -> (SerialT m a -> m b) -> m (SVar StreamK m a)
SVar.newFoldSVar State StreamK m a
st (StreamK m a -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a -> SerialT m a
SVar.teeToSVar SVar StreamK m a
sv forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK StreamK m a
m)
data TapState fs st a = TapInit | Tapping !fs st | TapDone st
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {a} {a}.
State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step forall fs st a. TapState fs st a
TapInit
where
drainFold :: SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr = do
Bool
done <- forall (m :: * -> *) a. MonadAsync m => SVar StreamK m a -> m Bool
SVar.fromConsumer SVar StreamK m a
svr
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar StreamK m a
svr String
"teeToSVar: waiting to drain"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar StreamK m a
svr)
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr
stopFold :: SVar StreamK m a -> m ()
stopFold SVar StreamK m a
svr = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
svr forall a. Maybe a
Nothing
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
drainFold SVar StreamK m a
svr
{-# INLINE_LATE step #-}
step :: State StreamK m a
-> TapState (SVar StreamK m a) s a
-> m (Step (TapState (SVar StreamK m a) s a) a)
step State StreamK m a
gst TapState (SVar StreamK m a) s a
TapInit = do
SVar StreamK m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State StreamK m a
gst Fold m a b
f
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
state1)
step State StreamK m a
gst (Tapping SVar StreamK m a
sv s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
case Step s a
r of
Yield a
a s
s -> do
Bool
done <- forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> a -> m Bool
SVar.pushToFold SVar StreamK m a
sv a
a
if Bool
done
then do
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall fs st a. st -> TapState fs st a
TapDone s
s)
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs st a. fs -> st -> TapState fs st a
Tapping SVar StreamK m a
sv s
s)
Step s a
Stop -> do
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar StreamK m a -> m ()
stopFold SVar StreamK m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
step State StreamK m a
gst (TapDone s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> forall s a. a -> s -> Step s a
Yield a
a (forall fs st a. st -> TapState fs st a
TapDone s
s)
Skip s
s -> forall s a. s -> Step s a
Skip (forall fs st a. st -> TapState fs st a
TapDone s
s)
Step s a
Stop -> forall s a. Step s a
Stop
newtype ParallelT m a = ParallelT {forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT :: K.StreamK m a}
instance MonadTrans ParallelT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
lift = forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
type Parallel = ParallelT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
append :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append (ParallelT StreamK m a
m1) (ParallelT StreamK m a
m2) = forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m1 StreamK m a
m2
instance MonadAsync m => Semigroup (ParallelT m a) where
<> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append
instance MonadAsync m => Monoid (ParallelT m a) where
mempty :: ParallelT m a
mempty = forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel ::
ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT StreamK m (a -> b)
m1) (ParallelT StreamK m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK (forall a (m :: * -> *). a -> StreamK m a
K.fromPure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) StreamK m a
m2
in forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK forall {b}. (a -> b) -> StreamK m b
f StreamK m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
{-# INLINE pure #-}
pure :: forall a. a -> ParallelT m a
pure = forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel
{-# INLINE bind #-}
{-# SPECIALIZE bind ::
ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bind :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind (ParallelT StreamK m a
m) a -> ParallelT m b
f = forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith forall (m :: * -> *) a.
MonadAsync m =>
StreamK m a -> StreamK m a -> StreamK m a
parallelK StreamK m a
m (forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ParallelT m b
f)
instance MonadAsync m => Monad (ParallelT m) where
return :: forall a. a -> ParallelT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: forall a b. ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind
MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), K.StreamK m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), StreamK m a)
newCallbackStream = do
SVar Any m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv
let callback :: a -> m ()
callback a
a = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return (forall {m :: * -> *}. MonadIO m => a -> m ()
callback, forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))