module Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
(
pushWorker
, dispatchWorker
, dispatchWorkerPaced
, sendWorkerWait
, startChannel
, sendWorkerDelay
, sendWorkerDelayPaced
)
where
import Control.Concurrent (takeMVar, threadDelay)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Maybe (fromJust, fromMaybe)
import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef)
import Streamly.Internal.Control.Concurrent (MonadRunInIO)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_, storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(MicroSecond64(..), diffAbsTime64, fromRelTime64, toRelTime64)
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Worker
{-# NOINLINE pushWorker #-}
pushWorker :: MonadRunInIO m => Count -> Channel m a -> m ()
pushWorker :: forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
yieldMax Channel m a
sv = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
+ Int
1
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv)
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => IORef Int -> SVarStats -> m ()
recordMaxWorkers (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
Maybe WorkerInfo
winfo <-
case forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
yieldMax
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop Channel m a
sv Maybe WorkerInfo
winfo) (forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv) SomeException -> IO ()
exception forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> m ()
modThread
where
modThread :: ThreadId -> m ()
modThread = forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
exception :: SomeException -> IO ()
exception = forall a.
IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE getEffectiveWorkerLimit #-}
getEffectiveWorkerLimit :: MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit Channel m a
sv = do
let workerLimit :: Limit
workerLimit = forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv
case forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Just IORef Count
ref -> do
Count
n <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef Count
ref
case forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Just YieldRateInfo
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Maybe YieldRateInfo
Nothing ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
case Limit
workerLimit of
Limit
Unlimited -> Word -> Limit
Limited (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$ forall a. Ord a => a -> a -> a
min Word
lim (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
{-# INLINE checkMaxThreads #-}
checkMaxThreads :: MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads :: forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads Int
active Channel m a
sv = do
Limit
limit <- forall (m :: * -> *) a. MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit Channel m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return
forall a b. (a -> b) -> a -> b
$ case Limit
limit of
Limit
Unlimited -> Bool
True
Limited Word
lim -> forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Ord a => a -> a -> Bool
> Int
active
{-# INLINE checkMaxBuffer #-}
checkMaxBuffer :: MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer :: forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer Int
active Channel m a
sv = do
let limit :: Limit
limit = forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv
case Limit
limit of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
([ChildEvent a]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Ord a => a -> a -> Bool
> Int
n forall a. Num a => a -> a -> a
+ Int
active
dispatchWorker :: MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker :: forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
yieldCount Channel m a
sv = do
Bool
done <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
if Bool -> Bool
not Bool
done
then do
Bool
qDone <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone Channel m a
sv
Int
active <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active forall a. Ord a => a -> a -> Bool
< Int
0) forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"dispatchWorker active negative"
if Bool -> Bool
not Bool
qDone
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads Int
active Channel m a
sv
if Bool
r
then do
Bool
r1 <- forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer Int
active Channel m a
sv
if Bool
r1
then forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
yieldCount Channel m a
sv forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
dispatchWorkerPaced :: MonadRunInIO m =>
Channel m a -> m Bool
dispatchWorkerPaced :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv = do
let yinfo :: YieldRateInfo
yinfo = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv
(Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
AbsTime
now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
(Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ Bool
-> SVarStats
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency
(forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) YieldRateInfo
yinfo Bool
False
let elapsed :: NanoSecond64
elapsed = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
let latency :: NanoSecond64
latency =
if NanoSecond64
lat forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then forall a. a -> Maybe a -> a
fromMaybe NanoSecond64
lat (YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo)
else NanoSecond64
lat
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)
if NanoSecond64
wLatency forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
let workerLimit :: Limit
workerLimit = forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
Count
gainLoss <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range
case Work
work of
BlockWait NanoSecond64
s -> do
forall a. HasCallStack => Bool -> a -> a
assert (NanoSecond64
s forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool
done <- forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ do
let us :: MicroSecond64
us = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
1 Channel m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
PartialWorker Count
yields -> do
forall a. HasCallStack => Bool -> a -> a
assert (Count
yields forall a. Ord a => a -> a -> Bool
> Count
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
Bool
done <- forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
yields Channel m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ManyWorkers Int
netWorkers Count
yields -> do
forall a. HasCallStack => Bool -> a -> a
assert (Int
netWorkers forall a. Ord a => a -> a -> Bool
>= Int
1) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall a. HasCallStack => Bool -> a -> a
assert (Count
yields forall a. Ord a => a -> a -> Bool
>= Count
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
ycnt :: Count
ycnt = forall a. Ord a => a -> a -> a
max Count
1 forall a b. (a -> b) -> a -> b
$ Count
yields forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
period :: Count
period = forall a. Ord a => a -> a -> a
min Count
ycnt (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Count
old <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef Count
periodRef
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period forall a. Ord a => a -> a -> Bool
< Count
old) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period
Int
cnt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
if Int
cnt forall a. Ord a => a -> a -> Bool
< Int
netWorkers
then do
let total :: Int
total = Int
netWorkers forall a. Num a => a -> a -> a
- Int
cnt
batch :: Int
batch = forall a. Ord a => a -> a -> a
max Int
1 forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$
NanoSecond64
minThreadDelay forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
forall {t}. (Eq t, Num t) => t -> m Bool
dispatchN (forall a. Ord a => a -> a -> a
min Int
total Int
batch)
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
let buf :: Count
buf = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& forall a. Num a => a -> a
abs Count
yields forall a. Ord a => a -> a -> Bool
> Count
buf) forall a b. (a -> b) -> a -> b
$ do
let delta :: Count
delta =
if Count
yields forall a. Ord a => a -> a -> Bool
> Count
0
then Count
yields forall a. Num a => a -> a -> a
- Count
buf
else Count
yields forall a. Num a => a -> a -> a
+ Count
buf
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall t. IORef t -> (t -> t) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (forall a. Num a => a -> a -> a
+ Count
delta)
dispatchN :: t -> m Bool
dispatchN t
n =
if t
n forall a. Eq a => a -> a -> Bool
== t
0
then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
Bool
r <- forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
if Bool
r
then t -> m Bool
dispatchN (t
n forall a. Num a => a -> a -> a
- t
1)
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadIO m
=> Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait :: forall (m :: * -> *) a.
MonadIO m =>
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait Bool
eagerEval Channel m a -> IO ()
delay Channel m a -> m Bool
dispatch Channel m a
sv = m ()
go
where
go :: m ()
go = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Channel m a -> IO ()
delay Channel m a
sv
([ChildEvent a]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 Bool -> Bool -> Bool
|| Bool
eagerEval) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const Bool
True
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
Bool
canDoMore <- Channel m a -> m Bool
dispatch Channel m a
sv
if Bool
canDoMore
then m ()
go
else do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ Bool -> IO [Char] -> [Char] -> IO () -> IO ()
withDiagMVar
(forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> IO [Char]
dumpSVar Channel m a
sv)
[Char]
"sendWorkerWait: nothing to do"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
([ChildEvent a]
_, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
if Int
len forall a. Ord a => a -> a -> Bool
<= Int
0
then m ()
go
else
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const Bool
False
startChannel :: MonadRunInIO m =>
Channel m a -> m ()
startChannel :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m ()
startChannel Channel m a
chan = do
case forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
chan of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
chan
Just YieldRateInfo
yinfo ->
if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo forall a. Eq a => a -> a -> Bool
== forall a. Bounded a => a
maxBound
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound
else forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
1 Channel m a
chan
sendWorkerDelayPaced :: Channel m a -> IO ()
sendWorkerDelayPaced :: forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelayPaced Channel m a
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendWorkerDelay :: Channel m a -> IO ()
sendWorkerDelay :: forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelay Channel m a
_sv =
forall (m :: * -> *) a. Monad m => a -> m a
return ()