-- |
-- Module      : Streamly.Internal.Data.SVar.Dispatch
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.SVar.Dispatch
    (
    -- * Latency collection
      collectLatency

    -- * Diagnostics
    , withDiagMVar
    , dumpSVar
    , printSVar

    -- * Thread accounting
    , delThread
    , modifyThread
    , allThreadsDone

    -- * Dispatching
    , recordMaxWorkers
    , pushWorker
    , pushWorkerPar
    , dispatchWorker
    , dispatchWorkerPaced
    , sendWorkerWait
    , sendFirstWorker
    , sendWorkerDelay
    , sendWorkerDelayPaced
    )
where

#include "inline.hs"

import Control.Concurrent (takeMVar, tryReadMVar, ThreadId, threadDelay)
import Control.Concurrent.MVar (tryPutMVar)
import Control.Exception
       (assert, catches, throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
        BlockedIndefinitelyOnSTM(..))
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Maybe (fromJust, fromMaybe)
import Data.IORef (IORef, modifyIORef, newIORef, readIORef, writeIORef)
#if __GLASGOW_HASKELL__ < 804
import Data.Semigroup ((<>))
#endif
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Atomics
       (atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
        storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       (AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
        fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)
import System.IO (hPutStrLn, stderr)

import qualified Data.Heap as H
import qualified Data.Set as S

import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Worker

-------------------------------------------------------------------------------
-- Worker latency data processing
-------------------------------------------------------------------------------

-- Every once in a while workers update the latencies and check the yield rate.
-- They return if we are above the expected yield rate. If we check too often
-- it may impact performance, if we check less often we may have a stale
-- picture. We update every minThreadDelay but we translate that into a yield
-- count based on latency so that the checking overhead is little.
--
-- XXX use a generation count to indicate that the value is updated. If the
-- value is updated an existing worker must check it again on the next yield.
-- Otherwise it is possible that we may keep updating it and because of the mod
-- worker keeps skipping it.
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo NanoSecond64
latency = do
    let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
        cnt :: NanoSecond64
cnt = forall a. Ord a => a -> a -> a
max NanoSecond64
1 forall a b. (a -> b) -> a -> b
$ NanoSecond64
minThreadDelay forall a. Integral a => a -> a -> a
`div` NanoSecond64
latency
        period :: NanoSecond64
period = forall a. Ord a => a -> a -> a
min NanoSecond64
cnt (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)

    forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef (forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
period)

{-# INLINE recordMinMaxLatency #-}
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
new = do
    let ss :: SVarStats
ss = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
    NanoSecond64
minLat <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss)
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new forall a. Ord a => a -> a -> Bool
< NanoSecond64
minLat Bool -> Bool -> Bool
|| NanoSecond64
minLat forall a. Eq a => a -> a -> Bool
== NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
        forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss) NanoSecond64
new

    NanoSecond64
maxLat <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss)
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new forall a. Ord a => a -> a -> Bool
> NanoSecond64
maxLat) forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss) NanoSecond64
new

recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time) = do
    let ss :: SVarStats
ss = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
    forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss) forall a b. (a -> b) -> a -> b
$
        \(Count
cnt, NanoSecond64
t) -> (Count
cnt forall a. Num a => a -> a -> a
+ Count
count, NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
time)

-- Pour the pending latency stats into a collection bucket
{-# INLINE collectWorkerPendingLatency #-}
collectWorkerPendingLatency
    :: IORef (Count, Count, NanoSecond64)
    -> IORef (Count, Count, NanoSecond64)
    -> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency :: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col = do
    (Count
fcount, Count
count, NanoSecond64
time) <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Count, Count, NanoSecond64)
cur forall a b. (a -> b) -> a -> b
$ \(Count, Count, NanoSecond64)
v -> ((Count
0,Count
0,NanoSecond64
0), (Count, Count, NanoSecond64)
v)

    (Count
fcnt, Count
cnt, NanoSecond64
t) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
    let totalCount :: Count
totalCount = Count
fcnt forall a. Num a => a -> a -> a
+ Count
fcount
        latCount :: Count
latCount   = Count
cnt forall a. Num a => a -> a -> a
+ Count
count
        latTime :: NanoSecond64
latTime    = NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
time
    forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
totalCount, Count
latCount, NanoSecond64
latTime)

    forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
latCount forall a. Eq a => a -> a -> Bool
== Count
0 Bool -> Bool -> Bool
|| NanoSecond64
latTime forall a. Eq a => a -> a -> Bool
/= NanoSecond64
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
    let latPair :: Maybe (Count, NanoSecond64)
latPair =
            if Count
latCount forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
            then forall a. a -> Maybe a
Just (Count
latCount, NanoSecond64
latTime)
            else forall a. Maybe a
Nothing
    forall (m :: * -> *) a. Monad m => a -> m a
return (Count
totalCount, Maybe (Count, NanoSecond64)
latPair)

{-# INLINE shouldUseCollectedBatch #-}
shouldUseCollectedBatch
    :: Count
    -> NanoSecond64
    -> NanoSecond64
    -> NanoSecond64
    -> Bool
shouldUseCollectedBatch :: Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
collectedYields NanoSecond64
collectedTime NanoSecond64
newLat NanoSecond64
prevLat =
    let r :: Double
r = forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
newLat forall a. Fractional a => a -> a -> a
/ forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
prevLat :: Double
    in     (Count
collectedYields forall a. Ord a => a -> a -> Bool
> forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
        Bool -> Bool -> Bool
|| (NanoSecond64
collectedTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
minThreadDelay)
        Bool -> Bool -> Bool
|| (NanoSecond64
prevLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 Bool -> Bool -> Bool
&& (Double
r forall a. Ord a => a -> a -> Bool
> Double
2 Bool -> Bool -> Bool
|| Double
r forall a. Ord a => a -> a -> Bool
< Double
0.5))
        Bool -> Bool -> Bool
|| (NanoSecond64
prevLat forall a. Eq a => a -> a -> Bool
== NanoSecond64
0)

-- Returns a triple, (1) yield count since last collection, (2) the base time
-- when we started counting, (3) average latency in the last measurement
-- period. The former two are used for accurate measurement of the going rate
-- whereas the average is used for future estimates e.g. how many workers
-- should be maintained to maintain the rate.
-- CAUTION! keep it in sync with getWorkerLatency
collectLatency :: SVar t m a
               -> YieldRateInfo
               -> Bool
               -> IO (Count, AbsTime, NanoSecond64)
collectLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
drain = do
    let cur :: IORef (Count, Count, NanoSecond64)
cur      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
        col :: IORef (Count, Count, NanoSecond64)
col      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
        longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
        measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo

    (Count
newCount, Maybe (Count, NanoSecond64)
newLatPair) <- IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col
    (Count
lcount, AbsTime
ltime) <- forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
    NanoSecond64
prevLat <- forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured

    let newLcount :: Count
newLcount = Count
lcount forall a. Num a => a -> a -> a
+ Count
newCount
        retWith :: c -> m (Count, AbsTime, c)
retWith c
lat = forall (m :: * -> *) a. Monad m => a -> m a
return (Count
newLcount, AbsTime
ltime, c
lat)

    case Maybe (Count, NanoSecond64)
newLatPair of
        Maybe (Count, NanoSecond64)
Nothing -> forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
        Just (Count
count, NanoSecond64
time) -> do
            let newLat :: NanoSecond64
newLat = NanoSecond64
time forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
count
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
newLat
            -- When we have collected a significant sized batch we compute the
            -- new latency using that batch and return the new latency,
            -- otherwise we return the previous latency derived from the
            -- previous batch.
            if Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
newCount NanoSecond64
time NanoSecond64
newLat NanoSecond64
prevLat Bool -> Bool -> Bool
|| Bool
drain
            then do
                -- XXX make this NOINLINE?
                YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo (forall a. Ord a => a -> a -> a
max NanoSecond64
newLat NanoSecond64
prevLat)
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time)
                forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
0, Count
0, NanoSecond64
0)
                forall a. IORef a -> a -> IO ()
writeIORef IORef NanoSecond64
measured ((NanoSecond64
prevLat forall a. Num a => a -> a -> a
+ NanoSecond64
newLat) forall a. Integral a => a -> a -> a
`div` NanoSecond64
2)
                forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Count, AbsTime)
longTerm forall a b. (a -> b) -> a -> b
$ \(Count
_, AbsTime
t) -> (Count
newLcount, AbsTime
t)
                forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
newLat
            else forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat

-------------------------------------------------------------------------------
-- Dumping the SVar for debug/diag
-------------------------------------------------------------------------------

dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv SVarStats
ss SVarStyle
style = do
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
        Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just YieldRateInfo
yinfo -> do
            (Count, AbsTime, NanoSecond64)
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
True
            forall (m :: * -> *) a. Monad m => a -> m a
return ()

    Int
dispatches <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
totalDispatches SVarStats
ss
    Int
maxWrk <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxWorkers SVarStats
ss
    Int
maxOq <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxOutQSize SVarStats
ss
    Int
maxHp <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxHeapSize SVarStats
ss
    NanoSecond64
minLat <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss
    NanoSecond64
maxLat <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss
    (Count
avgCnt, NanoSecond64
avgTime) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss
    (Count
svarCnt, Count
svarGainLossCnt, RelTime64
svarLat) <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
        Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
        Just YieldRateInfo
yinfo -> do
            (Count
cnt, AbsTime
startTime) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
            if Count
cnt forall a. Ord a => a -> a -> Bool
> Count
0
            then do
                Maybe AbsTime
t <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime SVarStats
ss)
                Count
gl <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
                case Maybe AbsTime
t of
                    Maybe AbsTime
Nothing -> do
                        AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
                        let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
startTime
                        forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
                    Just AbsTime
stopTime -> do
                        let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
stopTime AbsTime
startTime
                        forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
            else forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)

    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
        [ String
"total dispatches = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
dispatches
        , String
"max workers = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
maxWrk
        , String
"max outQSize = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
maxOq
            forall a. Semigroup a => a -> a -> a
<> (if SVarStyle
style forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
               then String
"\nheap max size = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
maxHp
               else String
"")
            forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
minLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
               then String
"\nmin worker latency = " forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
minLat
               else String
"")
            forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
maxLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
               then String
"\nmax worker latency = " forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
maxLat
               else String
"")
            forall a. Semigroup a => a -> a -> a
<> (if Count
avgCnt forall a. Ord a => a -> a -> Bool
> Count
0
                then let lat :: NanoSecond64
lat = NanoSecond64
avgTime forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
avgCnt
                     in String
"\navg worker latency = " forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
lat
                else String
"")
            forall a. Semigroup a => a -> a -> a
<> (if RelTime64
svarLat forall a. Ord a => a -> a -> Bool
> RelTime64
0
               then String
"\nSVar latency = " forall a. Semigroup a => a -> a -> a
<> RelTime64 -> String
showRelTime64 RelTime64
svarLat
               else String
"")
            forall a. Semigroup a => a -> a -> a
<> (if Count
svarCnt forall a. Ord a => a -> a -> Bool
> Count
0
               then String
"\nSVar yield count = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Count
svarCnt
               else String
"")
            forall a. Semigroup a => a -> a -> a
<> (if Count
svarGainLossCnt forall a. Ord a => a -> a -> Bool
> Count
0
               then String
"\nSVar gain/loss yield count = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Count
svarGainLossCnt
               else String
"")
        ]

{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv = do
    ([ChildEvent a]
oqList, Int
oqLen) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv
    Maybe ()
db <- forall a. MVar a -> IO (Maybe a)
tryReadMVar forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv
    String
aheadDump <-
        if forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
        then do
            (Heap (Entry Int (AheadHeapEntry t m a))
oheap, Maybe Int
oheapSeq) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap SVar t m a
sv
            ([t m a]
wq, Int
wqSeq) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue SVar t m a
sv
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
                [ String
"heap length = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry t m a))
oheap)
                , String
"heap seqeunce = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe Int
oheapSeq
                , String
"work queue length = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: * -> *) a. Foldable t => t a -> Int
length [t m a]
wq)
                , String
"work queue sequence = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
wqSeq
                ]
        else forall (m :: * -> *) a. Monad m => a -> m a
return []

    let style :: SVarStyle
style = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv
    Bool
waiting <-
        if SVarStyle
style forall a. Eq a => a -> a -> Bool
/= SVarStyle
ParallelVar
        then forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
        else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    Set ThreadId
rthread <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv
    Int
workers <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
    String
stats <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)

    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
        [
          String
"Creator tid = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator SVar t m a
sv),
          String
"style = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
        , String
"---------CURRENT STATE-----------"
        , String
"outputQueue length computed  = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: * -> *) a. Foldable t => t a -> Int
length [ChildEvent a]
oqList)
        , String
"outputQueue length maintained = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
oqLen
        -- XXX print the types of events in the outputQueue, first 5
        , String
"outputDoorBell = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe ()
db
        ]
        forall a. Semigroup a => a -> a -> a
<> String
aheadDump
        forall a. Semigroup a => a -> a -> a
<> [String] -> String
unlines
        [ String
"needDoorBell = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Bool
waiting
        , String
"running threads = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Set ThreadId
rthread
        -- XXX print the status of first 5 threads
        , String
"running thread count = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
workers
        ]
        forall a. Semigroup a => a -> a -> a
<> String
"---------STATS-----------\n"
        forall a. Semigroup a => a -> a -> a
<> String
stats

-- MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we
-- can keep it on in production to debug problems quickly if and when they
-- happen, but it may result in unexpected output when threads are left hanging
-- until they are GCed because the consumer went away.

{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
    String
svInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
    Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
label forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo
    forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e

{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
    String
svInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
    Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
label forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo
    forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e

withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
label IO ()
action =
    if forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
    then
        IO ()
action forall a. IO a -> [Handler a] -> IO a
`catches` [ forall a e. Exception e => (e -> IO a) -> Handler a
Handler (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label)
                         , forall a e. Exception e => (e -> IO a) -> Handler a
Handler (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label)
                         ]
    else IO ()
action

printSVar :: SVar t m a -> String -> IO ()
printSVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
how = do
    String
svInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
    Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
"\n" forall a. Semigroup a => a -> a -> a
<> String
how forall a. Semigroup a => a -> a -> a
<> String
"\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo

-------------------------------------------------------------------------------
-- Thread accounting
-------------------------------------------------------------------------------

-- Thread tracking is needed for two reasons:
--
-- 1) Killing threads on exceptions. Threads may not be left to go away by
-- themselves because they may run for significant times before going away or
-- worse they may be stuck in IO and never go away.
--
-- 2) To know when all threads are done and the stream has ended.

{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv ThreadId
tid =
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid)

-- This is cheaper than modifyThread because we do not have to send a
-- outputDoorBell This can make a difference when more workers are being
-- dispatched.
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv ThreadId
tid =
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid)

-- If present then delete else add. This takes care of out of order add and
-- delete i.e. a delete arriving before we even added a thread.
-- This occurs when the forked thread is done even before the 'addThread' right
-- after the fork gets a chance to run.
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid = do
    Set ThreadId
changed <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Set ThreadId
old ->
        if forall a. Ord a => a -> Set a -> Bool
S.member ThreadId
tid Set ThreadId
old
        then let new :: Set ThreadId
new = forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
new)
        else let new :: Set ThreadId
new = forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
old)
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: * -> *) a. Foldable t => t a -> Bool
null Set ThreadId
changed) forall a b. (a -> b) -> a -> b
$
         forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            IO ()
writeBarrier
            forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()

-- | This is safe even if we are adding more threads concurrently because if
-- a child thread is adding another thread then anyway 'workerThreads' will
-- not be empty.
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Set a -> Bool
S.null forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)

-------------------------------------------------------------------------------
-- Dispatching workers
-------------------------------------------------------------------------------

{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
    Int
maxWrk <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxWorkers forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active forall a. Ord a => a -> a -> Bool
> Int
maxWrk) forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxWorkers forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) Int
active
    forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef Int
totalDispatches forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (forall a. Num a => a -> a -> a
+Int
1)

{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
yieldMax SVar t m a
sv = do
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
+ Int
1
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
    -- This allocation matters when significant number of workers are being
    -- sent. We allocate it only when needed.
    Maybe WorkerInfo
winfo <-
        case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
            Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            Just YieldRateInfo
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                IORef Count
cntRef <- forall a. a -> IO (IORef a)
newIORef Count
0
                AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                IORef (Count, AbsTime)
lat <- forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just WorkerInfo
                    { workerYieldMax :: Count
workerYieldMax = Count
yieldMax
                    , workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
                    , workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
                    }
    forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop SVar t m a
sv Maybe WorkerInfo
winfo) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
        forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv

-- XXX we can push the workerCount modification in accountThread and use the
-- same pushWorker for Parallel case as well.
--
-- | In contrast to pushWorker which always happens only from the consumer
-- thread, a pushWorkerPar can happen concurrently from multiple threads on the
-- producer side. So we need to use a thread safe modification of
-- workerThreads. Alternatively, we can use a CreateThread event to avoid
-- using a CAS based modification.
{-# INLINE pushWorkerPar #-}
pushWorkerPar
    :: MonadAsync m
    => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar t m a
sv Maybe WorkerInfo -> m ()
wloop =
    if forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
    then m ()
forkWithDiag
    else forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop forall a. Maybe a
Nothing) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
            forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv

    where

    {-# NOINLINE forkWithDiag #-}
    forkWithDiag :: m ()
forkWithDiag = do
        -- We do not use workerCount in case of ParallelVar but still there is
        -- no harm in maintaining it correctly.
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
+ Int
1
        forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
        -- This allocation matters when significant number of workers are being
        -- sent. We allocate it only when needed. The overhead increases by 4x.
        Maybe WorkerInfo
winfo <-
            case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                Just YieldRateInfo
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                    IORef Count
cntRef <- forall a. a -> IO (IORef a)
newIORef Count
0
                    AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                    IORef (Count, AbsTime)
lat <- forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just WorkerInfo
                        { workerYieldMax :: Count
workerYieldMax = Count
0
                        , workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
                        , workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
                        }

        forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
winfo) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
            forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv

-- Returns:
-- True: can dispatch more
-- False: cannot dispatch any more
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
yieldCount SVar t m a
sv = do
    let workerLimit :: Limit
workerLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
    -- XXX in case of Ahead streams we should not send more than one worker
    -- when the work queue is done but heap is not done.
    Bool
done <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
    -- Note, "done" may not mean that the work is actually finished if there
    -- are workers active, because there may be a worker which has not yet
    -- queued the leftover work.
    if Bool -> Bool
not Bool
done
    then do
        Bool
qDone <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone SVar t m a
sv
        -- This count may not be accurate as it is decremented by the workers
        -- and we have no synchronization with that decrement.
        Int
active <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
        if Bool -> Bool
not Bool
qDone
        then do
            -- Note that we may deadlock if the previous workers (tasks in the
            -- stream) wait/depend on the future workers (tasks in the stream)
            -- executing. In that case we should either configure the maxWorker
            -- count to higher or use parallel style instead of ahead or async
            -- style.
            Limit
limit <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
                Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
                Just IORef Count
ref -> do
                    Count
n <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef Count
ref
                    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                        Just YieldRateInfo
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
                        Maybe YieldRateInfo
Nothing ->
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                                case Limit
workerLimit of
                                    Limit
Unlimited -> Word -> Limit
Limited (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
                                    Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$ forall a. Ord a => a -> a -> a
min Word
lim (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)

            -- XXX for ahead streams shall we take the heap yields into account
            -- for controlling the dispatch? We should not dispatch if the heap
            -- has already got the limit covered.
            let dispatch :: m Bool
dispatch = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
yieldCount SVar t m a
sv forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
             in case Limit
limit of
                Limit
Unlimited -> m Bool
dispatch
                -- Note that the use of remainingWork and workerCount is not
                -- atomic and the counts may even have changed between reading
                -- and using them here, so this is just approximate logic and
                -- we cannot rely on it for correctness. We may actually
                -- dispatch more workers than required.
                Limited Word
lim | Word
lim forall a. Ord a => a -> a -> Bool
> forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
active -> m Bool
dispatch
                Limit
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        else do
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- XXX in case of ahead style stream we need to take the heap size into account
-- because we return the workers on the basis of that which causes a condition
-- where we keep dispatching and they keep returning. So we must have exactly
-- the same logic for not dispatching and for returning.
--
-- Returns:
-- True: can dispatch more
-- False: full, no more dispatches
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv = do
    let yinfo :: YieldRateInfo
yinfo = forall a. (?callStack::CallStack) => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv
    (Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
        AbsTime
now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        (Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
        let elapsed :: NanoSecond64
elapsed = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
        let latency :: NanoSecond64
latency =
                if NanoSecond64
lat forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
                then forall a. a -> Maybe a -> a
fromMaybe NanoSecond64
lat (YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo)
                else NanoSecond64
lat

        forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)

    if NanoSecond64
wLatency forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
    -- Need to measure the latency with a single worker before we can perform
    -- any computation.
    then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    else do
        let workerLimit :: Limit
workerLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
        let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
        let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
        Count
gainLoss <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
        let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
                                   NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range

        -- XXX we need to take yieldLimit into account here. If we are at the
        -- end of the limit as well as the time, we should not be sleeping.
        -- If we are not actually planning to dispatch any more workers we need
        -- to take that in account.
        case Work
work of
            BlockWait NanoSecond64
s -> do
                forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
s forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                -- XXX note that when we return from here we will block waiting
                -- for the result from the existing worker. If that takes too
                -- long we won't be able to send another worker until the
                -- result arrives.
                --
                -- Sleep only if there are no active workers, otherwise we will
                -- defer the output of those. Note we cannot use workerCount
                -- here as it is not a reliable way to ensure there are
                -- definitely no active workers. When workerCount is 0 we may
                -- still have a Stop event waiting in the outputQueue.
                Bool
done <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ do
                    let us :: MicroSecond64
us = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
                    forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
1 SVar t m a
sv
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            PartialWorker Count
yields -> do
                forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields forall a. Ord a => a -> a -> Bool
> Count
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields

                Bool
done <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
yields SVar t m a
sv
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            ManyWorkers Int
netWorkers Count
yields -> do
                forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
netWorkers forall a. Ord a => a -> a -> Bool
>= Int
1) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields forall a. Ord a => a -> a -> Bool
>= Count
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields

                let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
                    ycnt :: Count
ycnt = forall a. Ord a => a -> a -> a
max Count
1 forall a b. (a -> b) -> a -> b
$ Count
yields forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
                    period :: Count
period = forall a. Ord a => a -> a -> a
min Count
ycnt (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)

                Count
old <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef Count
periodRef
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period forall a. Ord a => a -> a -> Bool
< Count
old) forall a b. (a -> b) -> a -> b
$
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period

                Int
cnt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
                if Int
cnt forall a. Ord a => a -> a -> Bool
< Int
netWorkers
                then do
                    let total :: Int
total = Int
netWorkers forall a. Num a => a -> a -> a
- Int
cnt
                        batch :: Int
batch = forall a. Ord a => a -> a -> a
max Int
1 forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$
                                    NanoSecond64
minThreadDelay forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
                    -- XXX stagger the workers over a period?
                    -- XXX cannot sleep, as that would mean we cannot process
                    -- the outputs. need to try a different mechanism to
                    -- stagger.
                    -- when (total > batch) $
                       -- liftIO $ threadDelay $ nanoToMicroSecs minThreadDelay
                    forall {t}. (Eq t, Num t) => t -> m Bool
dispatchN (forall a. Ord a => a -> a -> a
min Int
total Int
batch)
                else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

    where

    updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
        let buf :: Count
buf = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& forall a. Num a => a -> a
abs Count
yields forall a. Ord a => a -> a -> Bool
> Count
buf) forall a b. (a -> b) -> a -> b
$ do
            let delta :: Count
delta =
                   if Count
yields forall a. Ord a => a -> a -> Bool
> Count
0
                   then Count
yields forall a. Num a => a -> a -> a
- Count
buf
                   else Count
yields forall a. Num a => a -> a -> a
+ Count
buf
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (forall a. Num a => a -> a -> a
+ Count
delta)

    dispatchN :: t -> m Bool
dispatchN t
n =
        if t
n forall a. Eq a => a -> a -> Bool
== t
0
        then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else do
            Bool
r <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0 SVar t m a
sv
            if Bool
r
            then t -> m Bool
dispatchN (t
n forall a. Num a => a -> a -> a
- t
1)
            else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
    :: MonadAsync m
    => (SVar t m a -> IO ())
    -> (SVar t m a -> m Bool)
    -> SVar t m a
    -> m ()
sendWorkerWait :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv = do
    -- Note that we are guaranteed to have at least one outstanding worker when
    -- we enter this function. So if we sleep we are guaranteed to be woken up
    -- by an outputDoorBell, when the worker exits.

    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ()
delay SVar t m a
sv
    ([ChildEvent a]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
        -- The queue may be empty temporarily if the worker has dequeued the
        -- work item but has not enqueued the remaining part yet. For the same
        -- reason, a worker may come back if it tries to dequeue and finds the
        -- queue empty, even though the whole work has not finished yet.

        -- If we find that the queue is empty, but it may be empty
        -- temporarily, when we checked it. If that's the case we might
        -- sleep indefinitely unless the active workers produce some
        -- output. We may deadlock specially if the otuput from the active
        -- workers depends on the future workers that we may never send.
        -- So in case the queue was temporarily empty set a flag to inform
        -- the enqueue to send us a doorbell.

        -- Note that this is just a best effort mechanism to avoid a
        -- deadlock. Deadlocks may still happen if for some weird reason
        -- the consuming computation shares an MVar or some other resource
        -- with the producing computation and gets blocked on that resource
        -- and therefore cannot do any pushworker to add more threads to
        -- the producer. In such cases the programmer should use a parallel
        -- style so that all the producers are scheduled immediately and
        -- unconditionally. We can also use a separate monitor thread to
        -- push workers instead of pushing them from the consumer, but then
        -- we are no longer using pull based concurrency rate adaptation.
        --
        -- XXX update this in the tutorial.
        --
        -- Having pending active workers does not mean that we are guaranteed
        -- to be woken up if we sleep. In case of Ahead streams, there may be
        -- queued items in the heap even though the outputQueue is empty, and
        -- we may have active workers which are deadlocked on those items to be
        -- processed by the consumer. We should either guarantee that any
        -- worker, before returning, clears the heap or we send a worker to
        -- clear it. Normally we always send a worker if no output is seen, but
        -- if the thread limit is reached or we are using pacing then we may
        -- not send a worker. See the concurrentApplication test in the tests,
        -- that test case requires at least one yield from the producer to not
        -- deadlock, if the last workers output is stuck in the heap then this
        -- test fails.  This problem can be extended to n threads when the
        -- consumer may depend on the evaluation of next n items in the
        -- producer stream.

        -- register for the outputDoorBell before we check the queue so that if
        -- we sleep because the queue was empty we are guaranteed to get a
        -- doorbell on the next enqueue.

        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const Bool
True
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
        Bool
canDoMore <- SVar t m a -> m Bool
dispatch SVar t m a
sv

        -- XXX test for the case when we miss sending a worker when the worker
        -- count is more than 1500.
        --
        -- XXX Assert here that if the heap is not empty then there is at
        -- least one outstanding worker. Otherwise we could be sleeping
        -- forever.

        if Bool
canDoMore
        then forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
        else do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"sendWorkerWait: nothing to do"
                             forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
            ([ChildEvent a]
_, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv

sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m = do
    -- Note: We must have all the work on the queue before sending the
    -- pushworker, otherwise the pushworker may exit before we even get a
    -- chance to push.
    RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar t m a
sv (RunInIO m
runIn, t m a
m)
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
        Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
        Just YieldRateInfo
yinfo  ->
            if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo forall a. Eq a => a -> a -> Bool
== forall a. Bounded a => a
maxBound
            then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound
            else forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv
    forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv

sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()

sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelay SVar t m a
_sv =
    -- XXX we need a better way to handle this than hardcoded delays. The
    -- delays may be different for different systems.
    -- If there is a usecase where this is required we can create a combinator
    -- to set it as a config in the state.
    {-
  do
    ncpu <- getNumCapabilities
    if ncpu <= 1
    then
        if (svarStyle sv == AheadVar)
        then threadDelay 100
        else threadDelay 25
    else
        if (svarStyle sv == AheadVar)
        then threadDelay 100
        else threadDelay 10
    -}
    forall (m :: * -> *) a. Monad m => a -> m a
return ()