-- |
-- Module      : Streamly.Internal.Data.SVar.Worker
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.SVar.Worker
    (
    -- * Adjusting Limits
      decrementYieldLimit
    , incrementYieldLimit
    , decrementBufferLimit
    , incrementBufferLimit
    , resetBufferLimit

    -- * Rate Control
    , Work (..)
    , isBeyondMaxRate
    , estimateWorkers
    , updateYieldCount
    , minThreadDelay
    , workerRateControl
    , workerUpdateLatency

    -- * Send Events
    , send
    , ringDoorBell

    -- ** Yield
    , sendYield
    , sendToProducer

    -- ** Stop
    , sendStop
    , sendStopToProducer

    -- ** Exception
    , handleChildException
    , handleFoldException
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent.MVar (MVar, tryPutMVar)
import Control.Exception (SomeException(..), assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (IORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics
       (atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
        storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       (AbsTime, NanoSecond64(..), diffAbsTime64, fromRelTime64)

import Streamly.Internal.Data.SVar.Type

------------------------------------------------------------------------------
-- Collecting results from child workers in a streamed fashion
------------------------------------------------------------------------------

-- XXX Can we make access to remainingWork and yieldRateInfo fields in sv
-- faster, along with the fields in sv required by send?
-- XXX make it noinline
--
-- XXX we may want to employ an increment and decrement in batches when the
-- througput is high or when the cost of synchronization is high. For example
-- if the application is distributed then inc/dec of a shared variable may be
-- very costly.
--
-- A worker decrements the yield limit before it executes an action. However,
-- the action may not result in an element being yielded, in that case we have
-- to increment the yield limit.
--
-- Note that we need it to be an Int type so that we have the ability to undo a
-- decrement that takes it below zero.
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
sv =
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
        Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Just IORef Count
ref -> do
            Count
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x forall a. Num a => a -> a -> a
- Count
1, Count
x)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Count
r forall a. Ord a => a -> a -> Bool
>= Count
1

{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar t m a
sv =
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
        Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just IORef Count
ref -> forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (forall a. Num a => a -> a -> a
+ Count
1)

-- XXX exception safety of all atomic/MVar operations

-- TBD Each worker can have their own queue and the consumer can empty one
-- queue at a time, that way contention can be reduced.

-- XXX Only yields should be counted in the buffer limit and not the Stop
-- events.

{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
sv =
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
        Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Limited Word
_ -> do
            let ref :: IORef Count
ref = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
            Count
old <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x ->
                        (if Count
x forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
old forall a. Ord a => a -> a -> Bool
<= Count
0) forall a b. (a -> b) -> a -> b
$
                case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy SVar t m a
sv of
                    PushBufferPolicy
PushBufferBlock -> IO ()
blockAndRetry
                    PushBufferPolicy
PushBufferDropNew -> do
                        -- We just drop one item and proceed. It is possible
                        -- that by the time we drop the item the consumer
                        -- thread might have run and created space in the
                        -- buffer, but we do not care about that condition.
                        -- This is not pedantically correct but it should be
                        -- fine in practice.
                        -- XXX we may want to drop only if n == maxBuf
                        -- otherwise we must have space in the buffer and a
                        -- decrement should be possible.
                        Bool
block <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) forall a b. (a -> b) -> a -> b
$
                            \([ChildEvent a]
es, Int
n) ->
                                case [ChildEvent a]
es of
                                    [] -> (([],Int
n), Bool
True)
                                    ChildEvent a
_ : [ChildEvent a]
xs -> (([ChildEvent a]
xs, Int
n forall a. Num a => a -> a -> a
- Int
1), Bool
False)
                        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
block IO ()
blockAndRetry
                    -- XXX need a dequeue or ring buffer for this
                    PushBufferPolicy
PushBufferDropOld -> forall a. (?callStack::CallStack) => a
undefined

    where

    blockAndRetry :: IO ()
blockAndRetry = do
        let ref :: IORef Count
ref = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
        Count
old <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x ->
                        (if Count
x forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
        -- When multiple threads sleep on takeMVar, the first thread would
        -- wakeup due to a putMVar by the consumer, but the rest of the threads
        -- would have to put back the MVar after taking it and decrementing the
        -- buffer count, otherwise all other threads will remain asleep.
        if Count
old forall a. Ord a => a -> a -> Bool
>= Count
1
        then forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
        -- We do not put the MVar back in this case, instead we
        -- wait for the consumer to put it.
        else IO ()
blockAndRetry

{-# INLINE incrementBufferLimit #-}
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar t m a
sv =
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
        Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Limited Word
_ -> do
            forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv) (forall a. Num a => a -> a -> a
+ Count
1)
            IO ()
writeBarrier
            forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()

{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv =
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
        Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Limited Word
n -> forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv)
                                           (forall a b. a -> b -> a
const (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n))

-------------------------------------------------------------------------------
-- Yield control
-------------------------------------------------------------------------------

updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
    Count
cnt <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
    let cnt1 :: Count
cnt1 = Count
cnt forall a. Num a => a -> a -> a
+ Count
1
    forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
    forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1

isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
    let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
    in Count
ymax forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt forall a. Ord a => a -> a -> Bool
>= Count
ymax

-------------------------------------------------------------------------------
-- Sending results from worker
-------------------------------------------------------------------------------

{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv = do
    IO ()
storeLoadBarrier
    Bool
w <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w forall a b. (a -> b) -> a -> b
$ do
        -- Note: the sequence of operations is important for correctness here.
        -- We need to set the flag to false strictly before sending the
        -- outputDoorBell, otherwise the outputDoorBell may get processed too
        -- early and then we may set the flag to False to later making the
        -- consumer lose the flag, even without receiving a outputDoorBell.
        forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) (forall a b. a -> b -> a
const Bool
False)
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()

{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
    IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
    -- XXX can the access to outputQueue be made faster somehow?
    Int
oldlen <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
        ((ChildEvent a
msg forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n forall a. Num a => a -> a -> a
+ Int
1), Int
n)
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
        -- The wake up must happen only after the store has finished otherwise
        -- we can have lost wakeup problems.
        IO ()
writeBarrier
        -- Since multiple workers can try this at the same time, it is possible
        -- that we may put a spurious MVar after the consumer has already seen
        -- the output. But that's harmless, at worst it may cause the consumer
        -- to read the queue again and find it empty.
        -- The important point is that the consumer is guaranteed to receive a
        -- doorbell if something was added to the queue after it empties it.
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
    forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen

-- | This function is used by the producer threads to queue output for the
-- consumer thread to consume. Returns whether the queue has more space.
send :: SVar t m a -> ChildEvent a -> IO Int
send :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv = forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)

-- There is no bound implemented on the buffer, this is assumed to be low
-- traffic.
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv ChildEvent a
msg = do
    -- In case the producer stream is blocked on pushing to the fold buffer
    -- then wake it up so that it can check for the stop event or exception
    -- being sent to it otherwise we will be deadlocked.
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
    forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar t m a
sv)
                     (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar t m a
sv) ChildEvent a
msg

-------------------------------------------------------------------------------
-- Collect and update worker latency
-------------------------------------------------------------------------------

workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
    (Count
cnt0, AbsTime
t0) <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
    Count
cnt1 <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
    let cnt :: Count
cnt = Count
cnt1 forall a. Num a => a -> a -> a
- Count
cnt0

    if Count
cnt forall a. Ord a => a -> a -> Bool
> Count
0
    then do
        AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
        let period :: NanoSecond64
period = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
        forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
    else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

-- XXX There are a number of gotchas in measuring latencies.
-- 1) We measure latencies only when a worker yields a value
-- 2) It is possible that a stream calls the stop continuation, in which case
-- the worker would not yield a value and we would not account that worker in
-- latencies. Even though this case should ideally be accounted we do not
-- account it because we cannot or do not distinguish it from the case
-- described next.
-- 3) It is possible that a worker returns without yielding anything because it
-- never got a chance to pick up work.
-- 4) If the system timer resolution is lower than the latency, the latency
-- computation turns out to be zero.
--
-- We can fix this if we measure the latencies by counting the work items
-- picked rather than based on the outputs yielded.
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
    Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
    case Maybe (Count, NanoSecond64)
r of
        Just (Count
cnt, NanoSecond64
period) -> do
        -- NOTE: On JS platform the timer resolution could be pretty low. When
        -- the timer resolution is low, measurement of latencies could be
        -- tricky. All the worker latencies will turn out to be zero if they
        -- are lower than the resolution. We only take into account those
        -- measurements which are more than the timer resolution.

            let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
                (Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
            forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref forall a b. (a -> b) -> a -> b
$
                    \(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc forall a. Num a => a -> a -> a
+ Count
cnt, Count
n forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
        Maybe (Count, NanoSecond64)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-------------------------------------------------------------------------------
-- Worker rate control
-------------------------------------------------------------------------------

-- We either block, or send one worker with limited yield count or one or more
-- workers with unlimited yield count.
data Work
    = BlockWait NanoSecond64
    | PartialWorker Count
    | ManyWorkers Int Count
    deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Work] -> ShowS
$cshowList :: [Work] -> ShowS
show :: Work -> String
$cshow :: Work -> String
showsPrec :: Int -> Work -> ShowS
$cshowsPrec :: Int -> Work -> ShowS
Show

-- | This is a magic number and it is overloaded, and used at several places to
-- achieve batching:
--
-- 1. If we have to sleep to slowdown this is the minimum period that we
--    accumulate before we sleep. Also, workers do not stop until this much
--    sleep time is accumulated.
-- 3. Collected latencies are computed and transferred to measured latency
--    after a minimum of this period.
minThreadDelay :: NanoSecond64
minThreadDelay :: NanoSecond64
minThreadDelay = NanoSecond64
1000000

-- | Another magic number! When we have to start more workers to cover up a
-- number of yields that we are lagging by then we cannot start one worker for
-- each yield because that may be a very big number and if the latency of the
-- workers is low these number of yields could be very high. We assume that we
-- run each extra worker for at least this much time.
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000

-- | Get the worker latency without resetting workerPendingLatency
-- Returns (total yield count, base time, measured latency)
-- CAUTION! keep it in sync with collectLatency
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo  = do
    let cur :: IORef (Count, Count, NanoSecond64)
cur      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
        col :: IORef (Count, Count, NanoSecond64)
col      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
        longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
        measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo

    (Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
    (Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
    (Count
lcount, AbsTime
ltime)     <- forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
    NanoSecond64
prevLat             <- forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured

    let latCount :: Count
latCount = Count
colCount forall a. Num a => a -> a -> a
+ Count
curCount
        latTime :: NanoSecond64
latTime  = NanoSecond64
colTime forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
        totalCount :: Count
totalCount = Count
colTotalCount forall a. Num a => a -> a -> a
+ Count
curTotalCount
        newLat :: NanoSecond64
newLat =
            if Count
latCount forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
            then let lat :: NanoSecond64
lat = NanoSecond64
latTime forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
                 -- XXX Give more weight to new?
                 in (NanoSecond64
lat forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
            else NanoSecond64
prevLat
    forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)

-- XXX we can use phantom types to distinguish the duration/latency/expectedLat
estimateWorkers
    :: Limit
    -> Count
    -> Count
    -> NanoSecond64
    -> NanoSecond64
    -> NanoSecond64
    -> LatencyRange
    -> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
                NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
    -- XXX we can have a maxEfficiency combinator as well which runs the
    -- producer at the maximal efficiency i.e. the number of workers are chosen
    -- such that the latency is minimum or within a range. Or we can call it
    -- maxWorkerLatency.
    --
    let
        -- How many workers do we need to achieve the required rate?
        --
        -- When the workers are IO bound we can increase the throughput by
        -- increasing the number of workers as long as the IO device has enough
        -- capacity to process all the requests concurrently. If the IO
        -- bandwidth is saturated increasing the workers won't help. Also, if
        -- the CPU utilization in processing all these requests exceeds the CPU
        -- bandwidth, then increasing the number of workers won't help.
        --
        -- When the workers are purely CPU bound, increasing the workers beyond
        -- the number of CPUs won't help.
        --
        -- TODO - measure the CPU and IO requirements of the workers. Have a
        -- way to specify the max bandwidth of the underlying IO mechanism and
        -- use that to determine the max rate of workers, and also take the CPU
        -- bandwidth into account. We can also discover the IO bandwidth if we
        -- know that we are not CPU bound, then how much steady state rate are
        -- we able to achieve. Design tests for CPU bound and IO bound cases.

        -- Calculate how many yields are we ahead or behind to match the exact
        -- required rate. Based on that we increase or decrease the effective
        -- workers.
        --
        -- When the worker latency is lower than required latency we begin with
        -- a yield and then wait rather than first waiting and then yielding.
        targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat forall a. Num a => a -> a -> a
- NanoSecond64
1) forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
        effectiveYields :: Count
effectiveYields = Count
svarYields forall a. Num a => a -> a -> a
+ Count
gainLossYields
        deltaYields :: Count
deltaYields = forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields forall a. Num a => a -> a -> a
- Count
effectiveYields

        -- We recover the deficit by running at a higher/lower rate for a
        -- certain amount of time. To keep the effective rate in reasonable
        -- limits we use rateRecoveryTime, minLatency and maxLatency.
        in  if Count
deltaYields forall a. Ord a => a -> a -> Bool
> Count
0
            then
                let deltaYieldsFreq :: Double
                    deltaYieldsFreq :: Double
deltaYieldsFreq =
                        forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields forall a. Fractional a => a -> a -> a
/
                            forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
                    yieldsFreq :: Double
yieldsFreq = Double
1.0 forall a. Fractional a => a -> a -> a
/ forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
                    totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
                    requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
1.0 forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
                    adjustedLat :: NanoSecond64
adjustedLat = forall a. Ord a => a -> a -> a
min (forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
                                      (LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
                in  forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
                    if NanoSecond64
wLatency forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
                    then Count -> Work
PartialWorker Count
deltaYields
                    else let workers :: NanoSecond64
workers = forall {p}. (Ord p, Num p) => p -> p
withLimit forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
                             limited :: NanoSecond64
limited = forall a. Ord a => a -> a -> a
min NanoSecond64
workers (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
                         in Int -> Count -> Work
ManyWorkers (forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
            else
                let expectedDuration :: NanoSecond64
expectedDuration = forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
                    sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
                    maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
                    s :: NanoSecond64
s = forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
                in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
                    -- if s is less than 0 it means our maxSleepTime is less
                    -- than the worker latency.
                    if NanoSecond64
s forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
    where
        withLimit :: p -> p
withLimit p
n =
            case Limit
workerLimit of
                Limit
Unlimited -> p
n
                Limited Word
x -> forall a. Ord a => a -> a -> a
min p
n (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)

isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo = do
    (Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo
    AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
    let duration :: NanoSecond64
duration = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
    let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
    Count
gainLoss <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
    let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv) Count
count Count
gainLoss NanoSecond64
duration
                               NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo)
    Int
cnt <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Work
work of
        -- XXX set the worker's maxYields or polling interval based on yields
        PartialWorker Count
_yields -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
1
        ManyWorkers Int
n Count
_ -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
n
        BlockWait NanoSecond64
_ -> Bool
True

-- XXX we should do rate control periodically based on the total yields rather
-- than based on the worker local yields as other workers may have yielded more
-- and we should stop based on the aggregate yields. However, latency update
-- period can be based on individual worker yields.
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
                  -> YieldRateInfo
                  -> WorkerInfo
                  -> Count
                  -> IO Bool
checkRatePeriodic :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
ycnt = do
    Count
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo)
    -- XXX use generation count to check if the interval has been updated
    if Count
i forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt forall a. Integral a => a -> a -> a
`mod` Count
i) forall a. Eq a => a -> a -> Bool
== Count
0
    then do
        YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo
        -- XXX not required for parallel streams
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo
    else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- CAUTION! this also updates the yield count and therefore should be called
-- only when we are actually yielding an element.
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo = do
    Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo
    Bool
beyondMaxRate <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
cnt
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)

-------------------------------------------------------------------------------
-- Send a yield event
-------------------------------------------------------------------------------

-- XXX we should do rate control here but not latency update in case of ahead
-- streams. latency update must be done when we yield directly to outputQueue
-- or when we yield to heap.
--
-- returns whether the worker should continue (True) or stop (False).
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar t m a
sv Maybe WorkerInfo
mwinfo ChildEvent a
msg = do
    Int
oldlen <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv ChildEvent a
msg
    let limit :: Limit
limit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv
    Bool
bufferSpaceOk <- case Limit
limit of
            Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Limited Word
lim -> do
                Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Int
oldlen forall a. Num a => a -> a -> a
+ Int
1) forall a. Ord a => a -> a -> Bool
< (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Num a => a -> a -> a
- Int
active)
    Bool
rateLimitOk <-
        case Maybe WorkerInfo
mwinfo of
            Just WorkerInfo
winfo ->
                case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                    Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                    Just YieldRateInfo
yinfo -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo
            Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk

-------------------------------------------------------------------------------
-- Send a Stop event
-------------------------------------------------------------------------------

{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
    Count
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i forall a. Eq a => a -> a -> Bool
/= Count
0) forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo

{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
sv Maybe WorkerInfo
mwinfo = do
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
- Int
1
    case (Maybe WorkerInfo
mwinfo, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv) of
      (Just WorkerInfo
winfo, Just YieldRateInfo
info) ->
          WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info
      (Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
          forall (m :: * -> *) a. Monad m => a -> m a
return ()
    IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid forall a. Maybe a
Nothing)

-- {-# NOINLINE sendStopToProducer #-}
sendStopToProducer :: MonadIO m => SVar t m a -> m ()
sendStopToProducer :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    ThreadId
tid <- IO ThreadId
myThreadId
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid forall a. Maybe a
Nothing)

-------------------------------------------------------------------------------
-- Send exceptions
-------------------------------------------------------------------------------

{-# NOINLINE handleFoldException #-}
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))

{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))