{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
#include "inline.hs"
module Streamly.Internal.Data.Stream.Async {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" from streamly package instead." #-}
(
AsyncT(..)
, Async
, consMAsync
, asyncK
, mkAsyncK
, mkAsyncD
, WAsyncT(..)
, WAsync
, consMWAsync
, wAsyncK
)
where
import Control.Concurrent (myThreadId)
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
#endif
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Trans.Class (MonadTrans(lift))
#endif
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
import Prelude hiding (map)
import qualified Data.Set as S
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.StreamK (Stream)
import Streamly.Internal.Data.Stream.SVar.Generate (fromSVar, fromSVarD)
import qualified Streamly.Internal.Data.StreamK as K
(foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream as D
(Stream(..), Step(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (toStreamK)
import Streamly.Internal.Data.SVar
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a (forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single (forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
SVar t m a -> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar t m a
sv IORef [(RunInIO m, t m a)]
q (RunInIO m, t m a)
m = do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [(RunInIO m, t m a)]
q forall a b. (a -> b) -> a -> b
$ \[(RunInIO m, t m a)]
ms -> (RunInIO m, t m a)
m forall a. a -> [a] -> [a]
: [(RunInIO m, t m a)]
ms
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadRunInIO m
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
res
then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
r
else do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. MonadRunInIO m
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q (forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
r
else do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
SVar t m a
-> LinkedQueue (RunInIO m, t m a)
-> (RunInIO m, t m a)
-> IO ()
enqueueFIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar t m a
sv LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m = do
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadRunInIO m
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: forall m a. MonadRunInIO m
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
incrContinue :: m WorkerStatus
incrContinue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q (forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
getLifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getLifoSVar State StreamK m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
IORef [(RunInIO m, Stream m a)]
q <- forall a. a -> IO (IORef a)
newIORef ([] :: [(RunInIO m, Stream m a)])
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State StreamK m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = forall (t :: * -> *) a. Foldable t => t a -> Bool
null forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- forall (t :: * -> *) a. Foldable t => t a -> Bool
null forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a -> m Bool
postProc SVar StreamK m a -> IO Bool
workDone IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop = SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State StreamK m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a
sv
, postProcess :: m Bool
postProcess = SVar StreamK m a -> m Bool
postProc SVar StreamK m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = forall a. a -> Maybe a
Just SVar StreamK m a
sv} SVar StreamK m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q
, isWorkDone :: IO Bool
isWorkDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar StreamK m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State StreamK m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outputHeap = forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar StreamK m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State StreamK m a
st of
Maybe Rate
Nothing ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited
Just Rate
_ ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited
in forall (m :: * -> *) a. Monad m => a -> m a
return SVar StreamK m a
sv
getFifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getFifoSVar State StreamK m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
LinkedQueue (RunInIO m, Stream m a)
q <- forall a. IO (LinkedQueue a)
newQ
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State StreamK m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a -> m Bool
postProc SVar StreamK m a -> IO Bool
workDone LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop = SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State StreamK m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a
sv
, postProcess :: m Bool
postProcess = SVar StreamK m a -> m Bool
postProc SVar StreamK m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = forall a. a -> Maybe a
Just SVar StreamK m a
sv} SVar StreamK m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q
, isWorkDone :: IO Bool
isWorkDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
WAsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar StreamK m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State StreamK m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outputHeap = forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar StreamK m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State StreamK m a
st of
Maybe Rate
Nothing ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited
Just Rate
_ ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited
in forall (m :: * -> *) a. Monad m => a -> m a
return SVar StreamK m a
sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
st Stream m a
m = do
RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar StreamK m a
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getLifoSVar State StreamK m a
st RunInIO m
mrun
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar StreamK m a
sv Stream m a
m
{-# INLINABLE mkAsyncK #-}
mkAsyncK :: MonadAsync m => Stream m a -> Stream m a
mkAsyncK :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncK Stream m a
m = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) Stream m a
m
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv
{-# INLINE_NORMAL mkAsyncD #-}
mkAsyncD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkAsyncD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncD Stream m a
m = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step forall a. Maybe a
Nothing
where
step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
SVar StreamK m a
sv <- forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
gst (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK Stream m a
m)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromSVarD SVar StreamK m a
sv
step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
D.Yield a
a s
s -> forall s a. a -> s -> Step s a
D.Yield a
a (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
D.Skip s
s -> forall s a. s -> Step s a
D.Skip (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Step s a
D.Stop -> forall s a. Step s a
D.Stop
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newWAsyncVar State StreamK m a
st Stream m a
m = do
RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar StreamK m a
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getFifoSVar State StreamK m a
st RunInIO m
mrun
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar StreamK m a
sv Stream m a
m
forkSVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- case SVarStyle
style of
SVarStyle
AsyncVar -> forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
st (forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
WAsyncVar -> forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newWAsyncVar State StreamK m a
st (forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"illegal svar type"
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv
where
concurrently :: StreamK m a -> StreamK m a -> StreamK m a
concurrently StreamK m a
ma StreamK m a
mb = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st) (RunInIO m
runInIO, StreamK m a
mb)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
ma
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
style -> do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar StreamK m a
sv (RunInIO m
runInIO, Stream m a
m2)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar StreamK m a)
_ -> forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2)
{-# INLINE asyncK #-}
asyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
asyncK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
AsyncVar
{-# INLINE consMAsync #-}
{-# SPECIALIZE consMAsync :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
consMAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync m a
m (AsyncT Stream m a
r) = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype AsyncT m a = AsyncT {forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT :: Stream m a}
#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans AsyncT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
lift = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
#endif
type Async = AsyncT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AsyncT IO a -> AsyncT IO a -> AsyncT IO a #-}
append :: MonadAsync m => AsyncT m a -> AsyncT m a -> AsyncT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
append (AsyncT Stream m a
m1) (AsyncT Stream m a
m2) = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AsyncT m a) where
<> :: AsyncT m a -> AsyncT m a -> AsyncT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
append
instance MonadAsync m => Monoid (AsyncT m a) where
mempty :: AsyncT m a
mempty = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAsync #-}
{-# SPECIALIZE apAsync :: AsyncT IO (a -> b) -> AsyncT IO a -> AsyncT IO b #-}
apAsync :: MonadAsync m => AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync (AsyncT Stream m (a -> b)
m1) (AsyncT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (forall a (m :: * -> *). a -> StreamK m a
K.fromPure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AsyncT m) where
{-# INLINE pure #-}
pure :: forall a. a -> AsyncT m a
pure = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync
{-# INLINE bindAsync #-}
{-# SPECIALIZE bindAsync ::
AsyncT IO a -> (a -> AsyncT IO b) -> AsyncT IO b #-}
bindAsync :: MonadAsync m => AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync (AsyncT Stream m a
m) a -> AsyncT m b
f = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m (forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AsyncT m b
f)
instance MonadAsync m => Monad (AsyncT m) where
return :: forall a. a -> AsyncT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync
#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) where
liftBase :: forall α. b α -> AsyncT m α
liftBase = forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif
MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
{-# INLINE wAsyncK #-}
wAsyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
wAsyncK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
WAsyncVar
{-# INLINE consMWAsync #-}
{-# SPECIALIZE consMWAsync :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
consMWAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync m a
m (WAsyncT Stream m a
r) = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype WAsyncT m a = WAsyncT {forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT :: Stream m a}
#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans WAsyncT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
lift = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
#endif
type WAsync = WAsyncT IO
{-# INLINE wAppend #-}
{-# SPECIALIZE wAppend :: WAsyncT IO a -> WAsyncT IO a -> WAsyncT IO a #-}
wAppend :: MonadAsync m => WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend :: forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend (WAsyncT Stream m a
m1) (WAsyncT Stream m a
m2) = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (WAsyncT m a) where
<> :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend
instance MonadAsync m => Monoid (WAsyncT m a) where
mempty :: WAsyncT m a
mempty = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apWAsync #-}
{-# SPECIALIZE apWAsync ::
WAsyncT IO (a -> b) -> WAsyncT IO a -> WAsyncT IO b #-}
apWAsync :: MonadAsync m => WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync (WAsyncT Stream m (a -> b)
m1) (WAsyncT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (forall a (m :: * -> *). a -> StreamK m a
K.fromPure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (WAsyncT m) where
pure :: forall a. a -> WAsyncT m a
pure = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> StreamK m a
K.fromPure
<*> :: forall a b. WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync
{-# INLINE bindWAsync #-}
{-# SPECIALIZE bindWAsync ::
WAsyncT IO a -> (a -> WAsyncT IO b) -> WAsyncT IO b #-}
bindWAsync :: MonadAsync m => WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync (WAsyncT Stream m a
m) a -> WAsyncT m b
f = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m (forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> WAsyncT m b
f)
instance MonadAsync m => Monad (WAsyncT m) where
return :: forall a. a -> WAsyncT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync
#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) where
liftBase :: forall α. b α -> WAsyncT m α
liftBase = forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)