{-# LANGUAGE CPP #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UnboxedTuples #-}
#ifdef DIAGNOSTICS_VERBOSE
#define DIAGNOSTICS
#endif
module Streamly.SVar
(
MonadAsync
, SVarStyle (..)
, SVar (..)
, Limit (..)
, State (streamVar)
, defState
, rstState
, getMaxThreads
, setMaxThreads
, getMaxBuffer
, setMaxBuffer
, getStreamRate
, setStreamRate
, setStreamLatency
, getYieldLimit
, setYieldLimit
, cleanupSVar
, cleanupSVarFromWorker
, newAheadVar
, newParallelVar
, atomicModifyIORefCAS
, WorkerInfo (..)
, YieldRateInfo (..)
, ThreadAbort (..)
, ChildEvent (..)
, AheadHeapEntry (..)
, send
, sendYield
, sendStop
, enqueueLIFO
, enqueueFIFO
, enqueueAhead
, reEnqueueAhead
, pushWorkerPar
, queueEmptyAhead
, dequeueAhead
, HeapDequeueResult(..)
, dequeueFromHeap
, dequeueFromHeapSeq
, requeueOnHeapTop
, updateHeapSeq
, withIORef
, Rate (..)
, getYieldRateInfo
, collectLatency
, workerUpdateLatency
, isBeyondMaxRate
, workerRateControl
, updateYieldCount
, decrementYieldLimit
, decrementYieldLimitPost
, incrementYieldLimit
, postProcessBounded
, postProcessPaced
, readOutputQBounded
, readOutputQPaced
, dispatchWorkerPaced
, sendFirstWorker
, delThread
, toStreamVar
, SVarStats (..)
, NanoSecs (..)
#ifdef DIAGNOSTICS
, dumpSVar
#endif
)
where
import Control.Concurrent
(ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar
(MVar, newEmptyMVar, tryPutMVar, takeMVar, newMVar)
import Control.Exception (SomeException(..), catch, mask, assert, Exception)
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Data.Atomics
(casIORef, readForCAS, peekTicket, atomicModifyIORefCAS_,
writeBarrier, storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.List ((\\))
import Data.Maybe (fromJust)
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.Clock (TimeSpec, Clock(Monotonic), getTime, toNanoSecs)
import qualified Data.Heap as H
import qualified Data.Set as S
#ifdef DIAGNOSTICS
import Control.Concurrent.MVar (tryTakeMVar)
import Control.Exception
(catches, throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import System.IO (hPutStrLn, stderr)
import Text.Printf (printf)
#endif
newtype NanoSecs = NanoSecs Int64
deriving ( Eq
, Read
, Show
, Enum
, Bounded
, Num
, Real
, Integral
, Ord
)
newtype Count = Count Int64
deriving ( Eq
, Read
, Show
, Enum
, Bounded
, Num
, Real
, Integral
, Ord
)
data ThreadAbort = ThreadAbort deriving Show
instance Exception ThreadAbort
data ChildEvent a =
ChildYield a
| ChildStop ThreadId (Maybe SomeException)
data AheadHeapEntry (t :: (* -> *) -> * -> *) m a =
AheadEntryPure a
| AheadEntryStream (t m a)
data SVarStyle =
AsyncVar
| WAsyncVar
| ParallelVar
| AheadVar
deriving (Eq, Show)
data WorkerInfo = WorkerInfo
{ workerYieldMax :: Count
, workerYieldCount :: IORef Count
, workerLatencyStart :: IORef (Count, TimeSpec)
}
data Rate = Rate
{ rateLow :: Double
, rateGoal :: Double
, rateHigh :: Double
, rateBuffer :: Int
}
data LatencyRange = LatencyRange
{ minLatency :: NanoSecs
, maxLatency :: NanoSecs
} deriving Show
data YieldRateInfo = YieldRateInfo
{ svarLatencyTarget :: NanoSecs
, svarLatencyRange :: LatencyRange
, svarRateBuffer :: Int
, svarGainedLostYields :: IORef Count
, svarAllTimeLatency :: IORef (Count, TimeSpec)
, workerBootstrapLatency :: Maybe NanoSecs
, workerPollingInterval :: IORef Count
, workerPendingLatency :: IORef (Count, NanoSecs)
, workerCollectedLatency :: IORef (Count, NanoSecs)
, workerMeasuredLatency :: IORef NanoSecs
}
data SVarStats = SVarStats {
totalDispatches :: IORef Int
, maxWorkers :: IORef Int
, maxOutQSize :: IORef Int
, maxHeapSize :: IORef Int
, maxWorkQSize :: IORef Int
, avgWorkerLatency :: IORef (Count, NanoSecs)
, minWorkerLatency :: IORef NanoSecs
, maxWorkerLatency :: IORef NanoSecs
, svarStopTime :: IORef (Maybe TimeSpec)
}
data Limit = Unlimited | Limited Word deriving Show
data SVar t m a = SVar
{
svarStyle :: SVarStyle
, outputQueue :: IORef ([ChildEvent a], Int)
, outputDoorBell :: MVar ()
, readOutputQ :: m [ChildEvent a]
, postProcess :: m Bool
, maxWorkerLimit :: Limit
, maxBufferLimit :: Limit
, remainingWork :: Maybe (IORef Count)
, yieldRateInfo :: Maybe YieldRateInfo
, enqueue :: t m a -> IO ()
, isWorkDone :: IO Bool
, isQueueDone :: IO Bool
, needDoorBell :: IORef Bool
, workLoop :: Maybe WorkerInfo -> m ()
, workerThreads :: IORef (Set ThreadId)
, workerCount :: IORef Int
, accountThread :: ThreadId -> m ()
, workerStopMVar :: MVar ()
, svarStats :: SVarStats
, svarRef :: Maybe (IORef ())
#ifdef DIAGNOSTICS
, svarCreator :: ThreadId
, outputHeap :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
, Maybe Int)
, aheadWorkQueue :: IORef ([t m a], Int)
#endif
}
-------------------------------------------------------------------------------
-- State for concurrency control
-------------------------------------------------------------------------------
-- XXX we can put the resettable fields in a oneShotConfig field and others in
-- a persistentConfig field. That way reset would be fast and scalable
-- irrespective of the number of fields.
--
-- XXX make all these Limited types and use phantom types to distinguish them
data State t m a = State
{ -- one shot configuration, automatically reset for each API call
streamVar :: Maybe (SVar t m a)
, _yieldLimit :: Maybe Count
-- persistent configuration, state that remains valid until changed by
-- an explicit setting via a combinator.
, _threadsHigh :: Limit
, _bufferHigh :: Limit
-- XXX these two can be collapsed into a single type
, _streamLatency :: Maybe NanoSecs -- bootstrap latency
, _maxStreamRate :: Maybe Rate
}
-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------
-- A magical value for the buffer size arrived at by running the smallest
-- possible task and measuring the optimal value of the buffer for that. This
-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
-- core-i7 processor.
magicMaxBuffer :: Word
magicMaxBuffer = 1500
defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads = Limited magicMaxBuffer
defaultMaxBuffer = Limited magicMaxBuffer
-- The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs.
defState :: State t m a
defState = State
{ streamVar = Nothing
, _yieldLimit = Nothing
, _threadsHigh = defaultMaxThreads
, _bufferHigh = defaultMaxBuffer
, _maxStreamRate = Nothing
, _streamLatency = Nothing
}
-- XXX if perf gets affected we can have all the Nothing params in a single
-- structure so that we reset is fast. We can also use rewrite rules such that
-- reset occurs only in concurrent streams to reduce the impact on serial
-- streams.
-- We can optimize this so that we clear it only if it is a Just value, it
-- results in slightly better perf for zip/zipM but the performance of scan
-- worsens a lot, it does not fuse.
rstState :: State t m a -> State t m b
rstState st = st
{ streamVar = Nothing
, _yieldLimit = Nothing
}
-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------
-- Use get/set routines instead of directly accessing the State fields
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit lim st =
st { _yieldLimit =
case lim of
Nothing -> Nothing
Just n ->
if n <= 0
then Just 0
else Just (fromIntegral n)
}
getYieldLimit :: State t m a -> Maybe Count
getYieldLimit = _yieldLimit
setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads n st =
st { _threadsHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxThreads
else Limited (fromIntegral n)
}
getMaxThreads :: State t m a -> Limit
getMaxThreads = _threadsHigh
setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer n st =
st { _bufferHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxBuffer
else Limited (fromIntegral n)
}
getMaxBuffer :: State t m a -> Limit
getMaxBuffer = _bufferHigh
setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate r st = st { _maxStreamRate = r }
getStreamRate :: State t m a -> Maybe Rate
getStreamRate = _maxStreamRate
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency n st =
st { _streamLatency =
if n < 0
then Nothing
else if n == 0
then Nothing
else Just (fromIntegral n)
}
getStreamLatency :: State t m a -> Maybe NanoSecs
getStreamLatency = _streamLatency
-------------------------------------------------------------------------------
-- Cleanup
-------------------------------------------------------------------------------
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar sv = do
workers <- readIORef (workerThreads sv)
Prelude.mapM_ (\tid -> throwTo tid ThreadAbort)
(S.toList workers)
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker sv = do
workers <- readIORef (workerThreads sv)
self <- myThreadId
mapM_ (\tid -> throwTo tid ThreadAbort)
(S.toList workers \\ [self])
-------------------------------------------------------------------------------
-- Dumping the SVar for debug/diag
-------------------------------------------------------------------------------
#ifdef DIAGNOSTICS
secs :: Double -> String
secs k
| k < 0 = '-' : secs (-k)
| k >= 1 = k `with` "s"
| k >= 1e-3 = (k*1e3) `with` "ms"
#ifdef mingw32_HOST_OS
| k >= 1e-6 = (k*1e6) `with` "us"
#else
| k >= 1e-6 = (k*1e6) `with` "μs"
#endif
| k >= 1e-9 = (k*1e9) `with` "ns"
| k >= 1e-12 = (k*1e12) `with` "ps"
| k >= 1e-15 = (k*1e15) `with` "fs"
| k >= 1e-18 = (k*1e18) `with` "as"
| otherwise = printf "%g s" k
where with (t :: Double) (u :: String)
| t >= 1e9 = printf "%.4g %s" t u
| t >= 1e3 = printf "%.0f %s" t u
| t >= 1e2 = printf "%.1f %s" t u
| t >= 1e1 = printf "%.2f %s" t u
| otherwise = printf "%.3f %s" t u
drainLatency :: SVarStats -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
drainLatency _ss yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(count, time) <- atomicModifyIORefCAS cur $ \v -> ((0,0), v)
(colCount, colTime) <- readIORef col
(lcount, ltime) <- readIORef longTerm
prev <- readIORef measured
let pendingCount = colCount + count
pendingTime = colTime + time
lcount' = lcount + pendingCount
notUpdated = (lcount', ltime, prev)
if (pendingCount > 0)
then do
let new = pendingTime `div` (fromIntegral pendingCount)
#ifdef DIAGNOSTICS
minLat <- readIORef (minWorkerLatency _ss)
when (new < minLat || minLat == 0) $
writeIORef (minWorkerLatency _ss) new
maxLat <- readIORef (maxWorkerLatency _ss)
when (new > maxLat) $ writeIORef (maxWorkerLatency _ss) new
#endif
writeIORef col (0, 0)
writeIORef measured new
#ifdef DIAGNOSTICS
modifyIORef (avgWorkerLatency _ss) $
\(cnt, t) -> (cnt + pendingCount, t + pendingTime)
#endif
modifyIORef longTerm $ \(_, t) -> (lcount', t)
return (lcount', ltime, new)
else return notUpdated
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats sv ss style = do
case yieldRateInfo sv of
Nothing -> return ()
Just yinfo -> do
_ <- liftIO $ drainLatency (svarStats sv) yinfo
return ()
dispatches <- readIORef $ totalDispatches ss
maxWrk <- readIORef $ maxWorkers ss
maxOq <- readIORef $ maxOutQSize ss
maxHp <- readIORef $ maxHeapSize ss
minLat <- readIORef $ minWorkerLatency ss
maxLat <- readIORef $ maxWorkerLatency ss
(avgCnt, avgTime) <- readIORef $ avgWorkerLatency ss
(svarCnt, svarGainLossCnt, svarLat) <- case yieldRateInfo sv of
Nothing -> return (0, 0, 0)
Just yinfo -> do
(cnt, startTime) <- readIORef $ svarAllTimeLatency yinfo
if cnt > 0
then do
t <- readIORef (svarStopTime ss)
gl <- readIORef (svarGainedLostYields yinfo)
case t of
Nothing -> do
now <- getTime Monotonic
let interval = toNanoSecs (now - startTime)
return $ (cnt, gl, interval `div` fromIntegral cnt)
Just stopTime -> do
let interval = toNanoSecs (stopTime - startTime)
return $ (cnt, gl, interval `div` fromIntegral cnt)
else return (0, 0, 0)
return $ unlines
[ "total dispatches = " ++ show dispatches
, "max workers = " ++ show maxWrk
, "max outQSize = " ++ show maxOq
++ (if style == AheadVar
then "\nheap max size = " ++ show maxHp
else "")
++ (if minLat > 0
then "\nmin worker latency = "
++ secs (fromIntegral minLat * 1e-9)
else "")
++ (if maxLat > 0
then "\nmax worker latency = "
++ secs (fromIntegral maxLat * 1e-9)
else "")
++ (if avgCnt > 0
then let lat = avgTime `div` fromIntegral avgCnt
in "\navg worker latency = "
++ secs (fromIntegral lat * 1e-9)
else "")
++ (if svarLat > 0
then "\nSVar latency = "
++ secs (fromIntegral svarLat * 1e-9)
else "")
++ (if svarCnt > 0
then "\nSVar yield count = " ++ show svarCnt
else "")
++ (if svarGainLossCnt > 0
then "\nSVar gain/loss yield count = " ++ show svarGainLossCnt
else "")
]
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar sv = do
(oqList, oqLen) <- readIORef $ outputQueue sv
db <- tryTakeMVar $ outputDoorBell sv
aheadDump <-
if svarStyle sv == AheadVar
then do
(oheap, oheapSeq) <- readIORef $ outputHeap sv
(wq, wqSeq) <- readIORef $ aheadWorkQueue sv
return $ unlines
[ "heap length = " ++ show (H.size oheap)
, "heap seqeunce = " ++ show oheapSeq
, "work queue length = " ++ show (length wq)
, "work queue sequence = " ++ show wqSeq
]
else return []
let style = svarStyle sv
waiting <-
if style /= ParallelVar
then readIORef $ needDoorBell sv
else return False
rthread <- readIORef $ workerThreads sv
workers <- readIORef $ workerCount sv
stats <- dumpSVarStats sv (svarStats sv) (svarStyle sv)
return $ unlines
[ "Creator tid = " ++ show (svarCreator sv)
, "style = " ++ show (svarStyle sv)
, "---------CURRENT STATE-----------"
, "outputQueue length computed = " ++ show (length oqList)
, "outputQueue length maintained = " ++ show oqLen
, "outputDoorBell = " ++ show db
]
++ aheadDump ++ unlines
[ "needDoorBell = " ++ show waiting
, "running threads = " ++ show rthread
, "running thread count = " ++ show workers
]
++ "---------STATS-----------\n"
++ stats
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler sv label e@BlockedIndefinitelyOnMVar = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnMVar\n" ++ svInfo
throwIO e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler sv label e@BlockedIndefinitelyOnSTM = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnSTM\n" ++ svInfo
throwIO e
withDBGMVar :: SVar t m a -> String -> IO () -> IO ()
withDBGMVar sv label action =
action `catches` [ Handler (mvarExcHandler sv label)
, Handler (stmExcHandler sv label)
]
#else
withDBGMVar :: SVar t m a -> String -> IO () -> IO ()
withDBGMVar _ _ action = action
#endif
{-# INLINE atomicModifyIORefCAS #-}
atomicModifyIORefCAS :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORefCAS ref fn = do
tkt <- readForCAS ref
loop tkt retries
where
retries = 25 :: Int
loop _ 0 = atomicModifyIORef ref fn
loop old tries = do
let (new, result) = fn $ peekTicket old
(success, tkt) <- casIORef ref old new
if success
then return result
else loop tkt (tries - 1)
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
-> (SomeException -> IO ())
-> m ThreadId
doFork action exHandler =
control $ \runInIO ->
mask $ \restore -> do
tid <- rawForkIO $ catch (restore $ void $ runInIO action)
exHandler
runInIO (return tid)
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit sv =
case remainingWork sv of
Nothing -> return True
Just ref -> do
r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
return $ r >= 1
{-# INLINE decrementYieldLimitPost #-}
decrementYieldLimitPost :: SVar t m a -> IO Bool
decrementYieldLimitPost sv =
case remainingWork sv of
Nothing -> return True
Just ref -> do
r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
return $ r > 1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit sv =
case remainingWork sv of
Nothing -> return ()
Just ref -> atomicModifyIORefCAS_ ref (+ 1)
send :: SVar t m a -> ChildEvent a -> IO Bool
send sv msg = do
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
when (len <= 0) $ do
writeBarrier
void $ tryPutMVar (outputDoorBell sv) ()
let limit = maxBufferLimit sv
case limit of
Unlimited -> return True
Limited lim -> do
active <- readIORef (workerCount sv)
return $ len < ((fromIntegral lim) - active)
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecs))
workerCollectLatency winfo = do
(cnt0, t0) <- readIORef (workerLatencyStart winfo)
cnt1 <- readIORef (workerYieldCount winfo)
let cnt = cnt1 - cnt0
if (cnt > 0)
then do
t1 <- getTime Monotonic
let period = fromInteger $ toNanoSecs (t1 - t0)
writeIORef (workerLatencyStart winfo) (cnt1, t1)
return $ Just (cnt, period)
else return Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency yinfo winfo = do
r <- workerCollectLatency winfo
case r of
Just (cnt, period) -> do
let ref = workerPendingLatency yinfo
atomicModifyIORefCAS_ ref $ \(n, t) -> (n + cnt, t + period)
Nothing -> return ()
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount winfo = do
cnt <- readIORef (workerYieldCount winfo)
let cnt1 = cnt + 1
writeIORef (workerYieldCount winfo) cnt1
return cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield cnt winfo =
let ymax = workerYieldMax winfo
in ymax /= 0 && cnt >= ymax
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic sv yinfo winfo ycnt = do
i <- readIORef (workerPollingInterval yinfo)
if (i /= 0 && (ycnt `mod` i) == 0)
then do
workerUpdateLatency yinfo winfo
isBeyondMaxRate sv yinfo
else return False
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl sv yinfo winfo = do
cnt <- updateYieldCount winfo
beyondMaxRate <- checkRatePeriodic sv yinfo winfo cnt
return $ not (isBeyondMaxYield cnt winfo || beyondMaxRate)
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield sv mwinfo msg = do
r <- send sv msg
rateLimitOk <-
case mwinfo of
Just winfo ->
case yieldRateInfo sv of
Nothing -> return True
Just yinfo -> workerRateControl sv yinfo winfo
Nothing -> return True
return $ r && rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate winfo info = do
i <- readIORef (workerPollingInterval info)
when (i /= 0) $ workerUpdateLatency info winfo
{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop sv mwinfo = do
atomicModifyIORefCAS_ (workerCount sv) $ \n -> n - 1
case mwinfo of
Just winfo ->
case yieldRateInfo sv of
Nothing -> return ()
Just info -> workerStopUpdate winfo info
Nothing -> return ()
myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
{-# INLINE enqueueLIFO #-}
enqueueLIFO :: SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO sv q m = do
atomicModifyIORefCAS_ q $ \ms -> m : ms
storeLoadBarrier
w <- readIORef $ needDoorBell sv
when w $ do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE enqueueFIFO #-}
enqueueFIFO :: SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO sv q m = do
pushL q m
storeLoadBarrier
w <- readIORef $ needDoorBell sv
when w $ do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
enqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n + 1)
_ -> error "not empty"
storeLoadBarrier
w <- readIORef $ needDoorBell sv
when w $ do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n)
_ -> error "not empty"
storeLoadBarrier
w <- readIORef $ needDoorBell sv
when w $ do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead q = liftIO $ do
(xs, _) <- readIORef q
return $ null xs
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead q = liftIO $ do
atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing)
(x : [], n) -> (([], n), Just (x, n))
_ -> error "more than one item on queue"
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef ref f = readIORef ref >>= f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ ref f =
atomicModifyIORef ref $ \x -> (f x, ())
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap hpVar =
atomicModifyIORef hpVar $ \pair@(hp, snum) ->
case snum of
Nothing -> (pair, Clearing)
Just n -> do
let r = H.uncons hp
case r of
Just (ent@(Entry seqNo _ev), hp') | seqNo == n ->
((hp', Nothing), Ready ent)
_ -> (pair, Waiting n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq hpVar i =
atomicModifyIORef hpVar $ \(hp, snum) ->
case snum of
Nothing -> do
let r = H.uncons hp
case r of
Just (ent@(Entry seqNo _ev), hp') | seqNo == i ->
((hp', Nothing), Ready ent)
_ -> ((hp, Just i), Waiting i)
Just _ -> error "dequeueFromHeapSeq: unreachable"
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop hpVar ent seqNo =
atomicModifyIORef_ hpVar $ \(hp, _) -> (H.insert ent hp, Just seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq hpVar seqNo =
atomicModifyIORef_ hpVar $ \(hp, _) -> (hp, Just seqNo)
{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread sv tid =
liftIO $ modifyIORef (workerThreads sv) (S.insert tid)
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread sv tid =
liftIO $ modifyIORef (workerThreads sv) $ (\s -> S.delete tid s)
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread sv tid = do
changed <- liftIO $ atomicModifyIORefCAS (workerThreads sv) $ \old ->
if (S.member tid old)
then let new = (S.delete tid old) in (new, new)
else let new = (S.insert tid old) in (new, old)
if null changed
then liftIO $ do
writeBarrier
void $ tryPutMVar (outputDoorBell sv) ()
else return ()
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone sv = liftIO $ S.null <$> readIORef (workerThreads sv)
{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException sv e = do
tid <- myThreadId
void $ send sv (ChildStop tid (Just e))
#ifdef DIAGNOSTICS
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers sv = liftIO $ do
active <- readIORef (workerCount sv)
maxWrk <- readIORef (maxWorkers $ svarStats sv)
when (active > maxWrk) $ writeIORef (maxWorkers $ svarStats sv) active
modifyIORef (totalDispatches $ svarStats sv) (+1)
#endif
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker yieldMax sv = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
#ifdef DIAGNOSTICS
recordMaxWorkers sv
#endif
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just $ WorkerInfo
{ workerYieldMax = yieldMax
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
doFork (workLoop sv winfo) (handleChildException sv) >>= addThread sv
{-# NOINLINE pushWorkerPar #-}
pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar sv wloop = do
#ifdef DIAGNOSTICS
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
recordMaxWorkers sv
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just $ WorkerInfo
{ workerYieldMax = 0
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
doFork (wloop winfo) (handleChildException sv) >>= modifyThread sv
#else
doFork (wloop Nothing) (handleChildException sv) >>= modifyThread sv
#endif
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker yieldCount sv = do
let workerLimit = maxWorkerLimit sv
done <- liftIO $ isWorkDone sv
if (not done)
then do
qDone <- liftIO $ isQueueDone sv
active <- liftIO $ readIORef $ workerCount sv
if (not qDone)
then do
limit <- case remainingWork sv of
Nothing -> return workerLimit
Just ref -> do
n <- liftIO $ readIORef ref
return $
case workerLimit of
Unlimited -> Limited (fromIntegral n)
Limited lim -> Limited $ min lim (fromIntegral n)
let dispatch = pushWorker yieldCount sv >> return True
in case limit of
Unlimited -> dispatch
Limited lim | lim > 0 -> dispatch
_ -> return False
else do
when (active <= 0) $ pushWorker 0 sv
return False
else return False
minThreadDelay :: NanoSecs
minThreadDelay = 10^(6 :: Int)
rateRecoveryTime :: NanoSecs
rateRecoveryTime = 1000000
nanoToMicroSecs :: NanoSecs -> Int
nanoToMicroSecs s = (fromIntegral s) `div` 1000
data Work
= BlockWait NanoSecs
| PartialWorker Count
| ManyWorkers Int Count
deriving Show
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecs
-> NanoSecs
-> NanoSecs
-> LatencyRange
-> Work
estimateWorkers workerLimit svarYields gainLossYields
svarElapsed wLatency targetLat range =
let
targetYields = (svarElapsed + wLatency + targetLat - 1) `div` targetLat
effectiveYields = svarYields + gainLossYields
deltaYields = fromIntegral targetYields - effectiveYields
in if deltaYields > 0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq =
fromIntegral deltaYields /
fromIntegral rateRecoveryTime
yieldsFreq = 1.0 / fromIntegral targetLat
totalYieldsFreq = yieldsFreq + deltaYieldsFreq
requiredLat = NanoSecs $ round $ 1.0 / totalYieldsFreq
adjustedLat = min (max requiredLat (minLatency range))
(maxLatency range)
in assert (adjustedLat > 0) $
if wLatency <= adjustedLat
then PartialWorker deltaYields
else let workers = withLimit $ wLatency `div` adjustedLat
limited = min workers (fromIntegral deltaYields)
in ManyWorkers (fromIntegral limited) deltaYields
else
let expectedDuration = fromIntegral effectiveYields * targetLat
sleepTime = expectedDuration - svarElapsed
maxSleepTime = maxLatency range - wLatency
s = min sleepTime maxSleepTime
in assert (sleepTime >= 0) $
if (s > 0) then BlockWait s else ManyWorkers 1 (Count 0)
where
withLimit n =
case workerLimit of
Unlimited -> n
Limited x -> min n (fromIntegral x)
getWorkerLatency :: YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
getWorkerLatency yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(count, time) <- readIORef cur
(colCount, colTime) <- readIORef col
(lcount, ltime) <- readIORef longTerm
prev <- readIORef measured
let pendingCount = colCount + count
pendingTime = colTime + time
new =
if pendingCount > 0
then let lat = pendingTime `div` (fromIntegral pendingCount)
in (lat + prev) `div` 2
else prev
return (lcount + pendingCount, ltime, new)
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate sv yinfo = do
(count, tstamp, wLatency) <- getWorkerLatency yinfo
now <- getTime Monotonic
let duration = fromInteger $ toNanoSecs $ now - tstamp
let targetLat = svarLatencyTarget yinfo
gainLoss <- readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers (maxWorkerLimit sv) count gainLoss duration
wLatency targetLat (svarLatencyRange yinfo)
cnt <- readIORef $ workerCount sv
return $ case work of
PartialWorker _yields -> cnt > 1
ManyWorkers n _ -> cnt > n
BlockWait _ -> True
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecs -> IO ()
updateWorkerPollingInterval yinfo latency = do
let periodRef = workerPollingInterval yinfo
cnt = max 1 $ minThreadDelay `div` latency
period = min cnt (fromIntegral magicMaxBuffer)
writeIORef periodRef (fromIntegral period)
collectLatency :: SVarStats -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
collectLatency _ss yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(count, time) <- atomicModifyIORefCAS cur $ \v -> ((0,0), v)
(colCount, colTime) <- readIORef col
(lcount, ltime) <- readIORef longTerm
prev <- readIORef measured
let pendingCount = colCount + count
pendingTime = colTime + time
lcount' = lcount + pendingCount
tripleWith lat = (lcount', ltime, lat)
if (pendingCount > 0)
then do
let new = pendingTime `div` (fromIntegral pendingCount)
#ifdef DIAGNOSTICS
minLat <- readIORef (minWorkerLatency _ss)
when (new < minLat || minLat == 0) $
writeIORef (minWorkerLatency _ss) new
maxLat <- readIORef (maxWorkerLatency _ss)
when (new > maxLat) $ writeIORef (maxWorkerLatency _ss) new
#endif
if (pendingCount > fromIntegral magicMaxBuffer)
|| (pendingTime > minThreadDelay)
|| (let r = (fromIntegral new) / (fromIntegral prev) :: Double
in prev > 0 && (r > 2 || r < 0.5))
|| (prev == 0)
then do
updateWorkerPollingInterval yinfo (max new prev)
writeIORef col (0, 0)
writeIORef measured ((prev + new) `div` 2)
#ifdef DIAGNOSTICS
modifyIORef (avgWorkerLatency _ss) $
\(cnt, t) -> (cnt + pendingCount, t + pendingTime)
#endif
modifyIORef longTerm $ \(_, t) -> (lcount', t)
return $ tripleWith new
else do
writeIORef col (pendingCount, pendingTime)
return $ tripleWith prev
else return $ tripleWith prev
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced sv = do
let yinfo = fromJust $ yieldRateInfo sv
(svarYields, svarElapsed, wLatency) <- do
now <- liftIO $ getTime Monotonic
(yieldCount, baseTime, lat) <-
liftIO $ collectLatency (svarStats sv) yinfo
let elapsed = fromInteger $ toNanoSecs $ now - baseTime
let latency =
if lat == 0
then
case workerBootstrapLatency yinfo of
Nothing -> lat
Just t -> t
else lat
return (yieldCount, elapsed, latency)
if wLatency == 0
then return False
else do
let workerLimit = maxWorkerLimit sv
let targetLat = svarLatencyTarget yinfo
let range = svarLatencyRange yinfo
gainLoss <- liftIO $ readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers workerLimit svarYields gainLoss svarElapsed
wLatency targetLat range
case work of
BlockWait s -> do
assert (s >= 0) (return ())
done <- allThreadsDone sv
when done $ void $ do
liftIO $ threadDelay $ nanoToMicroSecs s
dispatchWorker 1 sv
return False
PartialWorker yields -> do
assert (yields > 0) (return ())
updateGainedLostYields yinfo yields
done <- allThreadsDone sv
when done $ void $ dispatchWorker yields sv
return False
ManyWorkers netWorkers yields -> do
assert (netWorkers >= 1) (return ())
assert (yields >= 0) (return ())
updateGainedLostYields yinfo yields
let periodRef = workerPollingInterval yinfo
ycnt = max 1 $ yields `div` fromIntegral netWorkers
period = min ycnt (fromIntegral magicMaxBuffer)
old <- liftIO $ readIORef periodRef
when (period < old) $
liftIO $ writeIORef periodRef period
cnt <- liftIO $ readIORef $ workerCount sv
if (cnt < netWorkers)
then do
let total = netWorkers - cnt
batch = max 1 $ fromIntegral $
minThreadDelay `div` targetLat
r <- dispatchN (min total batch)
return r
else return False
where
updateGainedLostYields yinfo yields = do
let buf = fromIntegral $ svarRateBuffer yinfo
when (yields /= 0 && abs yields > buf) $ do
let delta =
if yields > 0
then yields - buf
else yields + buf
liftIO $ modifyIORef (svarGainedLostYields yinfo) (+ delta)
dispatchN n = do
if n == 0
then return True
else do
r <- dispatchWorker 0 sv
if r
then dispatchN (n - 1)
else return False
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced _ = return ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay _sv = do
return ()
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadAsync m
=> (SVar t m a -> IO ())
-> (SVar t m a -> m Bool)
-> SVar t m a
-> m ()
sendWorkerWait delay dispatch sv = do
liftIO $ delay sv
(_, n) <- liftIO $ readIORef (outputQueue sv)
when (n <= 0) $ do
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
liftIO $ storeLoadBarrier
canDoMore <- dispatch sv
if canDoMore
then sendWorkerWait delay dispatch sv
else do
liftIO $ withDBGMVar sv "sendWorkerWait: nothing to do"
$ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)
when (len <= 0) $ sendWorkerWait delay dispatch sv
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw sv = do
(list, len) <- atomicModifyIORefCAS (outputQueue sv) $ \x -> (([],0), x)
#ifdef DIAGNOSTICS
oqLen <- readIORef (maxOutQSize $ svarStats sv)
when (len > oqLen) $ writeIORef (maxOutQSize $ svarStats sv) len
#endif
return (list, len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded sv = do
(list, len) <- liftIO $ readOutputQRaw sv
if len <= 0
then blockingRead
else do
sendOneWorker
return list
where
sendOneWorker = do
cnt <- liftIO $ readIORef $ workerCount sv
when (cnt <= 0) $ do
done <- liftIO $ isWorkDone sv
when (not done) $ pushWorker 0 sv
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelay (dispatchWorker 0) sv
liftIO $ (readOutputQRaw sv >>= return . fst)
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced sv = do
(list, len) <- liftIO $ readOutputQRaw sv
if len <= 0
then blockingRead
else do
void $ dispatchWorkerPaced sv
return list
where
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelayPaced dispatchWorkerPaced sv
liftIO $ (readOutputQRaw sv >>= return . fst)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded sv = do
workersDone <- allThreadsDone sv
if workersDone
then do
r <- liftIO $ isWorkDone sv
when (not r) $ pushWorker 0 sv
return r
else return False
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced sv = do
workersDone <- allThreadsDone sv
if workersDone
then do
r <- liftIO $ isWorkDone sv
when (not r) $ do
void $ dispatchWorkerPaced sv
noWorker <- allThreadsDone sv
when noWorker $ pushWorker 0 sv
return r
else return False
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo st = do
let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r
case getStreamRate st of
Just (Rate low goal high buf) ->
let l = rateToLatency goal
minl = rateToLatency high
maxl = rateToLatency low
in mkYieldRateInfo l (LatencyRange minl maxl) buf
Nothing -> return Nothing
where
mkYieldRateInfo latency latRange buf = do
measured <- newIORef 0
wcur <- newIORef (0,0)
wcol <- newIORef (0,0)
now <- getTime Monotonic
wlong <- newIORef (0,now)
period <- newIORef 1
gainLoss <- newIORef (Count 0)
return $ Just YieldRateInfo
{ svarLatencyTarget = latency
, svarLatencyRange = latRange
, svarRateBuffer = buf
, svarGainedLostYields = gainLoss
, workerBootstrapLatency = getStreamLatency st
, workerPollingInterval = period
, workerMeasuredLatency = measured
, workerPendingLatency = wcur
, workerCollectedLatency = wcol
, svarAllTimeLatency = wlong
}
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> IO (SVar t m a)
getAheadSVar st f = do
outQ <- newIORef ([], 0)
outH <- newIORef (H.empty, Just 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef ([], -1)
stopMVar <- newMVar ()
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
avgLat <- newIORef (0, NanoSecs 0)
maxLat <- newIORef (NanoSecs 0)
minLat <- newIORef (NanoSecs 0)
stpTime <- newIORef Nothing
#ifdef DIAGNOSTICS
tid <- myThreadId
#endif
let getSVar sv readOutput postProc = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxWorkerLimit = getMaxThreads st
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutput sv
, postProcess = postProc sv
, workerThreads = running
, workLoop = f q outH st{streamVar = Just sv} sv
, enqueue = enqueueAhead sv q
, isWorkDone = isWorkDoneAhead sv q outH
, isQueueDone = isQueueDoneAhead sv q
, needDoorBell = wfw
, svarStyle = AheadVar
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = stopMVar
, svarRef = Nothing
#ifdef DIAGNOSTICS
, svarCreator = tid
, aheadWorkQueue = q
, outputHeap = outH
#endif
, svarStats = SVarStats
{ totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
, avgWorkerLatency = avgLat
, minWorkerLatency = minLat
, maxWorkerLatency = maxLat
, svarStopTime = stpTime
}
}
let sv =
case getStreamRate st of
Nothing -> getSVar sv readOutputQBounded postProcessBounded
Just _ -> getSVar sv readOutputQPaced postProcessPaced
in return sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead sv q = do
queueDone <- checkEmpty q
yieldsDone <-
case remainingWork sv of
Just yref -> do
n <- readIORef yref
return (n <= 0)
Nothing -> return False
-- XXX note that yieldsDone can only be authoritative only when there
-- are no workers running. If there are active workers they can
-- later increment the yield count and therefore change the result.
return $ yieldsDone || queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead sv q ref = do
heapDone <- do
(hp, _) <- readIORef ref
return (H.size hp <= 0)
queueDone <- isQueueDoneAhead sv q
return $ heapDone && queueDone
checkEmpty q = do
(xs, _) <- readIORef q
return $ null xs
getParallelSVar :: MonadIO m => State t m a -> IO (SVar t m a)
getParallelSVar st = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
running <- newIORef S.empty
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
avgLat <- newIORef (0, NanoSecs 0)
maxLat <- newIORef (NanoSecs 0)
minLat <- newIORef (NanoSecs 0)
stpTime <- newIORef Nothing
#ifdef DIAGNOSTICS
tid <- myThreadId
#endif
let sv =
SVar { outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = Unlimited
, maxWorkerLimit = Unlimited
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutputQPar sv
, postProcess = allThreadsDone sv
, workerThreads = running
, workLoop = undefined
, enqueue = undefined
, isWorkDone = undefined
, isQueueDone = undefined
, needDoorBell = undefined
, svarStyle = ParallelVar
, workerCount = active
, accountThread = modifyThread sv
, workerStopMVar = undefined
, svarRef = Nothing
#ifdef DIAGNOSTICS
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
#endif
, svarStats = SVarStats
{ totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
, avgWorkerLatency = avgLat
, minWorkerLatency = minLat
, maxWorkerLatency = maxLat
, svarStopTime = stpTime
}
}
in return sv
where
readOutputQPar sv = liftIO $ do
withDBGMVar sv "readOutputQPar: doorbell" $ takeMVar (outputDoorBell sv)
case yieldRateInfo sv of
Nothing -> return ()
Just yinfo -> void $ collectLatency (svarStats sv) yinfo
readOutputQRaw sv >>= return . fst
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker sv m = do
-- Note: We must have all the work on the queue before sending the
-- pushworker, otherwise the pushworker may exit before we even get a
-- chance to push.
liftIO $ enqueue sv m
case yieldRateInfo sv of
Nothing -> pushWorker 0 sv
Just yinfo -> do
if svarLatencyTarget yinfo == maxBound
then liftIO $ threadDelay maxBound
else pushWorker 1 sv
return sv
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar st m wloop = do
sv <- liftIO $ getAheadSVar st wloop
sendFirstWorker sv m
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m => State t m a -> m (SVar t m a)
newParallelVar st = liftIO $ getParallelSVar st
-- XXX this errors out for Parallel/Ahead SVars
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
toStreamVar sv m = do
liftIO $ (enqueue sv) m
done <- allThreadsDone sv
-- XXX This is safe only when called from the consumer thread or when no
-- consumer is present. There may be a race if we are not running in the
-- consumer thread.
-- XXX do this only if the work queue is not empty. The work may have been
-- carried out by existing workers.
when done $
case yieldRateInfo sv of
Nothing -> pushWorker 0 sv
Just _ -> pushWorker 1 sv