-- |
-- Module      : Streamly.Internal.Data.Stream.Channel.Types
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- A Channel is a place where streams join and new streams start. This module
-- defines low level data structures and functions to build channels. For
-- concrete Channels see the Channel modules of specific stream types.
--
-- A Channel is a conduit to the output from multiple streams running
-- concurrently and asynchronously. A channel can be thought of as an
-- asynchronous IO handle. We can write any number of streams to a channel in a
-- non-blocking manner and then read them back at any time at any pace.  The
-- channel would run the streams asynchronously and accumulate results. A
-- channel may not really execute the stream completely and accumulate all the
-- results. However, it ensures that the reader can read the results at
-- whatever pace it wants to read. The channel monitors and adapts to the
-- consumer's pace.
--
-- A channel is a mini scheduler, it has an associated workLoop that holds the
-- stream tasks to be picked and run by a pool of worker threads. It has an
-- associated output queue where the output stream elements are placed by the
-- worker threads. An outputDoorBell is used by the worker threads to intimate the
-- consumer thread about availability of new results in the output queue. More
-- workers are added to the channel by 'fromChannel' on demand if the output
-- produced is not keeping pace with the consumer. On bounded channels, workers
-- block on the output queue to provide throttling of the producer  when the
-- consumer is not pulling fast enough.  The number of workers may even get
-- reduced depending on the consuming pace.
--
module Streamly.Internal.Data.Stream.Channel.Types
    (
    -- * Types
      Count (..)
    , Limit (..)
    , ThreadAbort (..)
    , ChildEvent (..)

    -- * Stats
    , SVarStats (..)
    , newSVarStats

    -- * Rate Control
    , WorkerInfo (..)
    , LatencyRange (..)
    , YieldRateInfo (..)
    , newRateInfo

    -- * Output queue
    , readOutputQRaw
    , readOutputQBasic
    , ringDoorBell

    -- * Yield Limit
    , decrementYieldLimit
    , incrementYieldLimit

    -- * Configuration
    , Rate (..)
    , StopWhen (..)
    , Config

    -- ** Default config
    , magicMaxBuffer
    , defaultConfig

    -- ** Set config
    , maxThreads
    , maxBuffer
    , maxYields
    , inspect
    , eager
    , stopWhen
    , ordered
    , interleaved

    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate

    -- ** Get config
    , getMaxThreads
    , getMaxBuffer
    , getStreamRate
    , getStreamLatency
    , setStreamLatency
    , getYieldLimit
    , getInspectMode
    , getEagerDispatch
    , getStopWhen
    , getOrdered
    , getInterleaved

    -- * Cleanup
    , cleanupSVar

    -- * Diagnostics
    , dumpCreator
    , dumpOutputQ
    , dumpDoorBell
    , dumpNeedDoorBell
    , dumpRunningThreads
    , dumpWorkerCount

    , withDiagMVar
    , printSVar

    , concatMapDivK
    )
where

import Control.Concurrent (ThreadId, throwTo, MVar, tryReadMVar)
import Control.Concurrent.MVar (tryPutMVar)
import Control.Exception
    ( SomeException(..), Exception, catches, throwIO, Handler(..)
    , BlockedIndefinitelyOnMVar(..), BlockedIndefinitelyOnSTM(..))
import Control.Monad (void, when)
import Data.Int (Int64)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Set (Set)
import Streamly.Internal.Data.Atomics
    (atomicModifyIORefCAS, atomicModifyIORefCAS_, storeLoadBarrier)
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..))
import System.IO (hPutStrLn, stderr)

import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

------------------------------------------------------------------------------
-- Common types
------------------------------------------------------------------------------

newtype Count = Count Int64
    deriving ( Count -> Count -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c== :: Count -> Count -> Bool
Eq
             , ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Count]
$creadListPrec :: ReadPrec [Count]
readPrec :: ReadPrec Count
$creadPrec :: ReadPrec Count
readList :: ReadS [Count]
$creadList :: ReadS [Count]
readsPrec :: Int -> ReadS Count
$creadsPrec :: Int -> ReadS Count
Read
             , Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Count] -> ShowS
$cshowList :: [Count] -> ShowS
show :: Count -> String
$cshow :: Count -> String
showsPrec :: Int -> Count -> ShowS
$cshowsPrec :: Int -> Count -> ShowS
Show
             , Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: Count -> Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFrom :: Count -> [Count]
fromEnum :: Count -> Int
$cfromEnum :: Count -> Int
toEnum :: Int -> Count
$ctoEnum :: Int -> Count
pred :: Count -> Count
$cpred :: Count -> Count
succ :: Count -> Count
$csucc :: Count -> Count
Enum
             , Count
forall a. a -> a -> Bounded a
maxBound :: Count
$cmaxBound :: Count
minBound :: Count
$cminBound :: Count
Bounded
             , Integer -> Count
Count -> Count
Count -> Count -> Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Count
$cfromInteger :: Integer -> Count
signum :: Count -> Count
$csignum :: Count -> Count
abs :: Count -> Count
$cabs :: Count -> Count
negate :: Count -> Count
$cnegate :: Count -> Count
* :: Count -> Count -> Count
$c* :: Count -> Count -> Count
- :: Count -> Count -> Count
$c- :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c+ :: Count -> Count -> Count
Num
             , Num Count
Ord Count
Count -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: Count -> Rational
$ctoRational :: Count -> Rational
Real
             , Enum Count
Real Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: Count -> Integer
$ctoInteger :: Count -> Integer
divMod :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cquotRem :: Count -> Count -> (Count, Count)
mod :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
div :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
rem :: Count -> Count -> Count
$crem :: Count -> Count -> Count
quot :: Count -> Count -> Count
$cquot :: Count -> Count -> Count
Integral
             , Eq Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmax :: Count -> Count -> Count
>= :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c< :: Count -> Count -> Bool
compare :: Count -> Count -> Ordering
$ccompare :: Count -> Count -> Ordering
Ord
             )

-- XXX We can use maxBound for unlimited?

-- This is essentially a 'Maybe Word' type
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Limit] -> ShowS
$cshowList :: [Limit] -> ShowS
show :: Limit -> String
$cshow :: Limit -> String
showsPrec :: Int -> Limit -> ShowS
$cshowsPrec :: Int -> Limit -> ShowS
Show

instance Eq Limit where
    Limit
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
True
    Limit
Unlimited == Limited Word
_ = Bool
False
    Limited Word
_ == Limit
Unlimited = Bool
False
    Limited Word
x == Limited Word
y = Word
x forall a. Eq a => a -> a -> Bool
== Word
y

instance Ord Limit where
    Limit
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
True
    Limit
Unlimited <= Limited Word
_ = Bool
False
    Limited Word
_ <= Limit
Unlimited = Bool
True
    Limited Word
x <= Limited Word
y = Word
x forall a. Ord a => a -> a -> Bool
<= Word
y

------------------------------------------------------------------------------
-- Parent child thread communication type
------------------------------------------------------------------------------

-- | Channel driver throws this exception to all active workers to clean up
-- the channel.
data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ThreadAbort] -> ShowS
$cshowList :: [ThreadAbort] -> ShowS
show :: ThreadAbort -> String
$cshow :: ThreadAbort -> String
showsPrec :: Int -> ThreadAbort -> ShowS
$cshowsPrec :: Int -> ThreadAbort -> ShowS
Show

instance Exception ThreadAbort

-- XXX Use a ChildSingle event to speed up mapM?
-- | Events that a child thread may send to a parent thread.
data ChildEvent a =
      ChildYield a
    | ChildStopChannel
    | ChildStop ThreadId (Maybe SomeException)

-- | We measure the individual worker latencies to estimate the number of workers
-- needed or the amount of time we have to sleep between dispatches to achieve
-- a particular rate when controlled pace mode it used.
data WorkerInfo = WorkerInfo
    {
    -- | 0 means unlimited
      WorkerInfo -> Count
workerYieldMax   :: Count
    -- | total number of yields by the worker till now
    , WorkerInfo -> IORef Count
workerYieldCount    :: IORef Count
    -- | yieldCount at start, timestamp
    , WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart  :: IORef (Count, AbsTime)
    }

data LatencyRange = LatencyRange
    { LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
    , LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
    } deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LatencyRange] -> ShowS
$cshowList :: [LatencyRange] -> ShowS
show :: LatencyRange -> String
$cshow :: LatencyRange -> String
showsPrec :: Int -> LatencyRange -> ShowS
$cshowsPrec :: Int -> LatencyRange -> ShowS
Show

-- | Rate control.
data YieldRateInfo = YieldRateInfo
    { YieldRateInfo -> NanoSecond64
svarLatencyTarget    :: NanoSecond64
    , YieldRateInfo -> LatencyRange
svarLatencyRange     :: LatencyRange
    , YieldRateInfo -> Int
svarRateBuffer       :: Int

    -- | [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count

    -- | Actual latency/througput as seen from the consumer side, we count the
    -- yields and the time it took to generates those yields. This is used to
    -- increase or decrease the number of workers needed to achieve the desired
    -- rate. The idle time of workers is adjusted in this, so that we only
    -- account for the rate when the consumer actually demands data.
    -- XXX interval latency is enough, we can move this under diagnostics build
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)

    -- XXX Worker latency specified by the user to be used before the first
    -- actual measurement arrives. Not yet implemented
    , YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64

    -- | After how many yields the worker should update the latency information.
    -- If the latency is high, this count is kept lower and vice-versa.  XXX If
    -- the latency suddenly becomes too high this count may remain too high for
    -- long time, in such cases the consumer can change it.
    -- 0 means no latency computation
    -- XXX this is derivable from workerMeasuredLatency, can be removed.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count

    -- | This is in progress latency stats maintained by the workers which we
    -- empty into workerCollectedLatency stats at certain intervals - whenever
    -- we process the stream elements yielded in this period. The first count
    -- is all yields, the second count is only those yields for which the
    -- latency was measured to be non-zero (note that if the timer resolution
    -- is low the measured latency may be zero e.g. on JS platform).
    -- [LOCKING] Locked access. Modified by the consumer thread as well as
    -- worker threads. Workers modify it periodically based on
    -- workerPollingInterval and not on every yield to reduce the locking
    -- overhead.
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency   :: IORef (Count, Count, NanoSecond64)

    -- | This is the second level stat which is an accmulation from
    -- workerPendingLatency stats. We keep accumulating latencies in this
    -- bucket until we have stats for a sufficient period and then we reset it
    -- to start collecting for the next period and retain the computed average
    -- latency for the last period in workerMeasuredLatency.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    -- | Latency as measured by workers, aggregated for the last period.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64
    }

data SVarStats = SVarStats {
      SVarStats -> IORef Int
totalDispatches  :: IORef Int
    , SVarStats -> IORef Int
maxWorkers       :: IORef Int
    , SVarStats -> IORef Int
maxOutQSize      :: IORef Int
    , SVarStats -> IORef Int
maxHeapSize      :: IORef Int
    , SVarStats -> IORef Int
maxWorkQSize     :: IORef Int
    , SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
    , SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef (Maybe AbsTime)
svarStopTime     :: IORef (Maybe AbsTime)
}

-------------------------------------------------------------------------------
-- Config
-------------------------------------------------------------------------------

-- | Specifies the stream yield rate in yields per second (@Hertz@).
-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
-- allow only as many yields as we have accumulated as per 'rateGoal' since the
-- start of time. If the consumer or the producer is slower or faster, the
-- actual rate may fall behind or exceed 'rateGoal'.  We try to recover the gap
-- between the two by increasing or decreasing the pull rate from the producer.
-- However, if the gap becomes more than 'rateBuffer' we try to recover only as
-- much as 'rateBuffer'.
--
-- 'rateLow' puts a bound on how low the instantaneous rate can go when
-- recovering the rate gap.  In other words, it determines the maximum yield
-- latency.  Similarly, 'rateHigh' puts a bound on how high the instantaneous
-- rate can go when recovering the rate gap.  In other words, it determines the
-- minimum yield latency. We reduce the latency by increasing concurrency,
-- therefore we can say that it puts an upper bound on concurrency.
--
-- If the 'rateGoal' is 0 or negative the stream never yields a value.
-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
--
data Rate = Rate
    { Rate -> Double
rateLow    :: Double -- ^ The lower rate limit
    , Rate -> Double
rateGoal   :: Double -- ^ The target rate we want to achieve
    , Rate -> Double
rateHigh   :: Double -- ^ The upper rate limit
    , Rate -> Int
rateBuffer :: Int    -- ^ Maximum slack from the goal
    }

-- | Specify when the 'Channel' should stop.
data StopWhen =
      FirstStops -- ^ Stop when the first stream ends.
    | AllStop    -- ^ Stop when all the streams end.
    | AnyStops   -- ^ Stop when any one stream ends.

-- XXX we can put the resettable fields in a oneShotConfig field and others in
-- a persistentConfig field. That way reset would be fast and scalable
-- irrespective of the number of fields.
--
-- XXX make all these Limited types and use phantom types to distinguish them

-- | An abstract type for specifying the configuration parameters of a
-- 'Channel'. Use @Config -> Config@ modifier functions to modify the default
-- configuration. See the individual modifier documentation for default values.
--
data Config = Config
    { -- one shot configuration, automatically reset for each API call
      -- streamVar   :: Maybe (SVar t m a)
      Config -> Maybe Count
_yieldLimit  :: Maybe Count

    -- persistent configuration, state that remains valid until changed by
    -- an explicit setting via a combinator.
    , Config -> Limit
_threadsHigh    :: Limit
    , Config -> Limit
_bufferHigh     :: Limit

    -- XXX these two can be collapsed into a single type
    , Config -> Maybe NanoSecond64
_streamLatency  :: Maybe NanoSecond64 -- bootstrap latency
    , Config -> Maybe Rate
_maxStreamRate  :: Maybe Rate
    , Config -> Bool
_inspect    :: Bool
    , Config -> Bool
_eagerDispatch  :: Bool
    , Config -> StopWhen
_stopWhen :: StopWhen
    , Config -> Bool
_ordered :: Bool
    , Config -> Bool
_interleaved :: Bool
    }

-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------

-- | A magical value for the buffer size arrived at by running the smallest
-- possible task and measuring the optimal value of the buffer for that.  This
-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
-- core-i7 processor.
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word
1500

defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer

-- | The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs. Use get/set routines instead of directly accessing
-- the Config fields
defaultConfig :: Config
defaultConfig :: Config
defaultConfig = Config
    { -- streamVar = Nothing
      _yieldLimit :: Maybe Count
_yieldLimit = forall a. Maybe a
Nothing
    , _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
    , _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
    , _maxStreamRate :: Maybe Rate
_maxStreamRate = forall a. Maybe a
Nothing
    , _streamLatency :: Maybe NanoSecond64
_streamLatency = forall a. Maybe a
Nothing
    , _inspect :: Bool
_inspect = Bool
False
    -- XXX Set it to True when Rate is not set?
    , _eagerDispatch :: Bool
_eagerDispatch = Bool
False
    , _stopWhen :: StopWhen
_stopWhen = StopWhen
AllStop
    , _ordered :: Bool
_ordered = Bool
False
    , _interleaved :: Bool
_interleaved = Bool
False
    }

-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------

maxYields :: Maybe Int64 -> Config -> Config
maxYields :: Maybe Int64 -> Config -> Config
maxYields Maybe Int64
lim Config
st =
    Config
st { _yieldLimit :: Maybe Count
_yieldLimit =
            case Maybe Int64
lim of
                Maybe Int64
Nothing -> forall a. Maybe a
Nothing
                Just Int64
n  ->
                    if Int64
n forall a. Ord a => a -> a -> Bool
<= Int64
0
                    then forall a. a -> Maybe a
Just Count
0
                    else forall a. a -> Maybe a
Just (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
       }

getYieldLimit :: Config -> Maybe Count
getYieldLimit :: Config -> Maybe Count
getYieldLimit = Config -> Maybe Count
_yieldLimit

-- | Specify the maximum number of threads that can be spawned by the channel.
-- A value of 0 resets the thread limit to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- When the actions in a stream are IO bound, having blocking IO calls, this
-- option can be used to control the maximum number of in-flight IO requests.
-- When the actions are CPU bound this option can be used to control the amount
-- of CPU used by the stream.
--
maxThreads :: Int -> Config -> Config
maxThreads :: Int -> Config -> Config
maxThreads Int
n Config
st =
    Config
st { _threadsHigh :: Limit
_threadsHigh =
            if Int
n forall a. Ord a => a -> a -> Bool
< Int
0
            then Limit
Unlimited
            else if Int
n forall a. Eq a => a -> a -> Bool
== Int
0
                 then Limit
defaultMaxThreads
                 else Word -> Limit
Limited (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getMaxThreads :: Config -> Limit
getMaxThreads :: Config -> Limit
getMaxThreads = Config -> Limit
_threadsHigh

-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
-- presence of infinite streams, or very large streams.  Especially, it must
-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
-- applicative zip streams generates an infinite stream causing unbounded
-- concurrent generation with no limit on the buffer or threads.
--
maxBuffer :: Int -> Config -> Config
maxBuffer :: Int -> Config -> Config
maxBuffer Int
n Config
st =
    Config
st { _bufferHigh :: Limit
_bufferHigh =
            if Int
n forall a. Ord a => a -> a -> Bool
< Int
0
            then Limit
Unlimited
            else if Int
n forall a. Eq a => a -> a -> Bool
== Int
0
                 then Limit
defaultMaxBuffer
                 else Word -> Limit
Limited (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getMaxBuffer :: Config -> Limit
getMaxBuffer :: Config -> Limit
getMaxBuffer = Config -> Limit
_bufferHigh

-- | Specify the stream evaluation rate of a channel.
--
-- A 'Nothing' value means there is no smart rate control, concurrent execution
-- blocks only if 'maxThreads' or 'maxBuffer' is reached, or there are no more
-- concurrent tasks to execute. This is the default.
--
-- When rate (throughput) is specified, concurrent production may be ramped
-- up or down automatically to achieve the specified stream throughput. The
-- specific behavior for different styles of 'Rate' specifications is
-- documented under 'Rate'.  The effective maximum production rate achieved by
-- a channel is governed by:
--
-- * The 'maxThreads' limit
-- * The 'maxBuffer' limit
-- * The maximum rate that the stream producer can achieve
-- * The maximum rate that the stream consumer can achieve
--
-- Maximum production rate is given by:
--
-- \(rate = \frac{maxThreads}{latency}\)
--
-- If we know the average latency of the tasks we can set 'maxThreads'
-- accordingly.
--
rate :: Maybe Rate -> Config -> Config
rate :: Maybe Rate -> Config -> Config
rate Maybe Rate
r Config
st = Config
st { _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
r }

getStreamRate :: Config -> Maybe Rate
getStreamRate :: Config -> Maybe Rate
getStreamRate = Config -> Maybe Rate
_maxStreamRate

setStreamLatency :: Int -> Config -> Config
setStreamLatency :: Int -> Config -> Config
setStreamLatency Int
n Config
st =
    Config
st { _streamLatency :: Maybe NanoSecond64
_streamLatency =
            if Int
n forall a. Ord a => a -> a -> Bool
<= Int
0
            then forall a. Maybe a
Nothing
            else forall a. a -> Maybe a
Just (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency = Config -> Maybe NanoSecond64
_streamLatency

-- XXX Rename to "inspect"

-- | Print debug information about the 'Channel' when the stream ends.
--
inspect :: Bool -> Config -> Config
inspect :: Bool -> Config -> Config
inspect Bool
flag Config
st = Config
st { _inspect :: Bool
_inspect = Bool
flag }

getInspectMode :: Config -> Bool
getInspectMode :: Config -> Bool
getInspectMode = Config -> Bool
_inspect

-- | By default, processing of output from the worker threads is given priority
-- over dispatching new workers. More workers are dispatched only when there is
-- no output to process. With 'eager' on workers are dispatched aggresively as
-- long as there is more work to do irrespective of whether there is output
-- pending to be processed. However, dispatching may stop if 'maxThreads' or
-- 'maxBuffer' is reached.
--
-- /Note:/ This option has no effect when rate has been specified.
--
-- /Note:/ Not supported with 'interleaved'.
--
eager :: Bool -> Config -> Config
eager :: Bool -> Config -> Config
eager Bool
flag Config
st = Config
st { _eagerDispatch :: Bool
_eagerDispatch = Bool
flag }

getEagerDispatch :: Config -> Bool
getEagerDispatch :: Config -> Bool
getEagerDispatch = Config -> Bool
_eagerDispatch

-- | Specify when the 'Channel' should stop.
stopWhen :: StopWhen -> Config -> Config
stopWhen :: StopWhen -> Config -> Config
stopWhen StopWhen
cond Config
st = Config
st { _stopWhen :: StopWhen
_stopWhen = StopWhen
cond }

getStopWhen :: Config -> StopWhen
getStopWhen :: Config -> StopWhen
getStopWhen = Config -> StopWhen
_stopWhen

-- | When enabled the streams may be evaluated cocnurrently but the results are
-- produced in the same sequence as a serial evaluation would produce.
--
-- /Note:/ Not supported with 'interleaved'.
--
ordered :: Bool -> Config -> Config
ordered :: Bool -> Config -> Config
ordered Bool
flag Config
st = Config
st { _ordered :: Bool
_ordered = Bool
flag }

getOrdered :: Config -> Bool
getOrdered :: Config -> Bool
getOrdered = Config -> Bool
_ordered

-- | Interleave the streams fairly instead of prioritizing the left stream.
-- This schedules all streams in a round robin fashion over limited number of
-- threads.
--
-- /Note:/ Can only be used on finite number of streams.
--
-- /Note:/ Not supported with 'ordered'.
--
interleaved :: Bool -> Config -> Config
interleaved :: Bool -> Config -> Config
interleaved Bool
flag Config
st = Config
st { _interleaved :: Bool
_interleaved = Bool
flag }

getInterleaved :: Config -> Bool
getInterleaved :: Config -> Bool
getInterleaved = Config -> Bool
_interleaved

-------------------------------------------------------------------------------
-- Initialization
-------------------------------------------------------------------------------

newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
st = do
    -- convert rate in Hertz to latency in Nanoseconds
    let rateToLatency :: a -> a
rateToLatency a
r = if a
r forall a. Ord a => a -> a -> Bool
<= a
0 then forall a. Bounded a => a
maxBound else forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ a
1.0e9 forall a. Fractional a => a -> a -> a
/ a
r
    case Config -> Maybe Rate
getStreamRate Config
st of
        Just (Rate Double
low Double
goal Double
high Int
buf) ->
            let l :: NanoSecond64
l    = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
                minl :: NanoSecond64
minl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
                maxl :: NanoSecond64
maxl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
            in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
        Maybe Rate
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

    where

    mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
        IORef NanoSecond64
measured <- forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
        IORef (Count, Count, NanoSecond64)
wcur     <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        IORef (Count, Count, NanoSecond64)
wcol     <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        AbsTime
now      <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Count, AbsTime)
wlong    <- forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
        IORef Count
period   <- forall a. a -> IO (IORef a)
newIORef Count
1
        IORef Count
gainLoss <- forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)

        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just YieldRateInfo
            { svarLatencyTarget :: NanoSecond64
svarLatencyTarget      = NanoSecond64
latency
            , svarLatencyRange :: LatencyRange
svarLatencyRange       = LatencyRange
latRange
            , svarRateBuffer :: Int
svarRateBuffer         = Int
buf
            , svarGainedLostYields :: IORef Count
svarGainedLostYields   = IORef Count
gainLoss
            , workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = Config -> Maybe NanoSecond64
getStreamLatency Config
st
            , workerPollingInterval :: IORef Count
workerPollingInterval  = IORef Count
period
            , workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency  = IORef NanoSecond64
measured
            , workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency   = IORef (Count, Count, NanoSecond64)
wcur
            , workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
            , svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency     = IORef (Count, AbsTime)
wlong
            }

newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
    IORef Int
disp   <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWrk <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxOq  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxHs  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWq  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Count, NanoSecond64)
avgLat <- forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
maxLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
minLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef (Maybe AbsTime)
stpTime <- forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing

    forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats
        { totalDispatches :: IORef Int
totalDispatches  = IORef Int
disp
        , maxWorkers :: IORef Int
maxWorkers       = IORef Int
maxWrk
        , maxOutQSize :: IORef Int
maxOutQSize      = IORef Int
maxOq
        , maxHeapSize :: IORef Int
maxHeapSize      = IORef Int
maxHs
        , maxWorkQSize :: IORef Int
maxWorkQSize     = IORef Int
maxWq
        , avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
        , minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
        , maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
        , svarStopTime :: IORef (Maybe AbsTime)
svarStopTime     = IORef (Maybe AbsTime)
stpTime
        }

-------------------------------------------------------------------------------
-- Rate
-------------------------------------------------------------------------------

-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@
--
-- Specifies the average production rate of a stream in number of yields
-- per second (i.e.  @Hertz@).  Concurrent production is ramped up or down
-- automatically to achieve the specified average yield rate. The rate can
-- go down to half of the specified rate on the lower side and double of
-- the specified rate on the higher side.
--
avgRate :: Double -> Config -> Config
avgRate :: Double -> Config -> Config
avgRate Double
r = Maybe Rate -> Config -> Config
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rforall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2forall a. Num a => a -> a -> a
*Double
r) forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@
--
-- Specifies the minimum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go below the
-- specified rate, even though it may possibly go above it at times, the
-- upper limit is double of the specified rate.
--
minRate :: Double -> Config -> Config
minRate :: Double -> Config -> Config
minRate Double
r = Maybe Rate -> Config -> Config
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2forall a. Num a => a -> a -> a
*Double
r) forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@
--
-- Specifies the maximum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go above the
-- specified rate, even though it may possibly go below it at times, the
-- lower limit is half of the specified rate. This can be useful in
-- applications where certain resource usage must not be allowed to go
-- beyond certain limits.
--
maxRate :: Double -> Config -> Config
maxRate :: Double -> Config -> Config
maxRate Double
r = Maybe Rate -> Config -> Config
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rforall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate r r r 0)@
--
-- Specifies a constant yield rate. If for some reason the actual rate
-- goes above or below the specified rate we do not try to recover it by
-- increasing or decreasing the rate in future.  This can be useful in
-- applications like graphics frame refresh where we need to maintain a
-- constant refresh rate.
--
constRate :: Double -> Config -> Config
constRate :: Double -> Config -> Config
constRate Double
r = Maybe Rate -> Config -> Config
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)

-------------------------------------------------------------------------------
-- Channel yield count
-------------------------------------------------------------------------------

-- XXX Can we make access to remainingWork and yieldRateInfo fields in sv
-- faster, along with the fields in sv required by send?
-- XXX make it noinline
--
-- XXX we may want to employ an increment and decrement in batches when the
-- througput is high or when the cost of synchronization is high. For example
-- if the application is distributed then inc/dec of a shared variable may be
-- very costly.

-- | A worker decrements the yield limit before it executes an action. However,
-- the action may not result in an element being yielded, in that case we have
-- to increment the yield limit.
--
-- Note that we need it to be an Int type so that we have the ability to undo a
-- decrement that takes it below zero.
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
decrementYieldLimit Maybe (IORef Count)
remaining =
    case Maybe (IORef Count)
remaining of
        Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Just IORef Count
ref -> do
            Count
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x forall a. Num a => a -> a -> a
- Count
1, Count
x)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Count
r forall a. Ord a => a -> a -> Bool
>= Count
1

{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: Maybe (IORef Count) -> IO ()
incrementYieldLimit :: Maybe (IORef Count) -> IO ()
incrementYieldLimit Maybe (IORef Count)
remaining =
    case Maybe (IORef Count)
remaining of
        Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just IORef Count
ref -> forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (forall a. Num a => a -> a -> a
+ Count
1)

-------------------------------------------------------------------------------
-- Output queue
-------------------------------------------------------------------------------

{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic :: forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q = forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q forall a b. (a -> b) -> a -> b
$ \([ChildEvent a], Int)
x -> (([],Int
0), ([ChildEvent a], Int)
x)

{-# INLINE readOutputQRaw #-}
readOutputQRaw ::
    IORef ([ChildEvent a], Int) -> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw :: forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw IORef ([ChildEvent a], Int)
q Maybe SVarStats
stats = do
    ([ChildEvent a]
list, Int
len) <- forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q
    case Maybe SVarStats
stats of
        Just SVarStats
ss -> do
            let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize SVarStats
ss
            Int
oqLen <- forall a. IORef a -> IO a
readIORef IORef Int
ref
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len forall a. Ord a => a -> a -> Bool
> Int
oqLen) forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
        Maybe SVarStats
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)

{-# INLINE ringDoorBell #-}
ringDoorBell :: IORef Bool -> MVar () -> IO ()
ringDoorBell :: IORef Bool -> MVar () -> IO ()
ringDoorBell IORef Bool
needBell MVar ()
bell = do
    IO ()
storeLoadBarrier
    Bool
w <- forall a. IORef a -> IO a
readIORef IORef Bool
needBell
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w forall a b. (a -> b) -> a -> b
$ do
        -- Note: the sequence of operations is important for correctness here.
        -- We need to set the flag to false strictly before sending the
        -- outputDoorBell, otherwise the outputDoorBell may get processed too
        -- early and then we may set the flag to False to later making the
        -- consumer lose the flag, even without receiving a outputDoorBell.
        forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Bool
needBell (forall a b. a -> b -> a
const Bool
False)
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()

-------------------------------------------------------------------------------
-- Diagnostics
-------------------------------------------------------------------------------

dumpCreator :: Show a => a -> String
dumpCreator :: forall a. Show a => a -> String
dumpCreator a
tid = String
"Creator tid = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show a
tid

dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String
dumpOutputQ :: forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ IORef (t a2, a1)
q = do
    (t a2
oqList, a1
oqLen) <- forall a. IORef a -> IO a
readIORef IORef (t a2, a1)
q
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
        [ String
"outputQueue length computed  = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: * -> *) a. Foldable t => t a -> Int
length t a2
oqList)
        , String
"outputQueue length maintained = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show a1
oqLen
        ]

dumpDoorBell :: Show a => MVar a -> IO String
dumpDoorBell :: forall a. Show a => MVar a -> IO String
dumpDoorBell MVar a
mvar =  do
    Maybe a
db <- forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
mvar
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ String
"outputDoorBell = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe a
db

dumpNeedDoorBell :: Show a => IORef a -> IO String
dumpNeedDoorBell :: forall a. Show a => IORef a -> IO String
dumpNeedDoorBell IORef a
ref = do
    a
waiting <- forall a. IORef a -> IO a
readIORef IORef a
ref
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ String
"needDoorBell = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show a
waiting

dumpRunningThreads :: Show a => IORef a -> IO String
dumpRunningThreads :: forall a. Show a => IORef a -> IO String
dumpRunningThreads IORef a
ref = do
    a
rthread <- forall a. IORef a -> IO a
readIORef IORef a
ref
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ String
"running threads = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show a
rthread

dumpWorkerCount :: Show a => IORef a -> IO String
dumpWorkerCount :: forall a. Show a => IORef a -> IO String
dumpWorkerCount IORef a
ref = do
    a
workers <- forall a. IORef a -> IO a
readIORef IORef a
ref
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ String
"running thread count = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show a
workers

{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler IO String
dump String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
    String
svInfo <- IO String
dump
    Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
label forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo
    forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e

{-# NOINLINE stmExcHandler #-}
stmExcHandler :: IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler IO String
dump String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
    String
svInfo <- IO String
dump
    Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
label forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo
    forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e

-- | MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we
-- can keep it on in production to debug problems quickly if and when they
-- happen, but it may result in unexpected output when threads are left hanging
-- until they are GCed because the consumer went away.
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar Bool
inspecting IO String
dump String
label IO ()
action =
    if Bool
inspecting
    then
        IO ()
action forall a. IO a -> [Handler a] -> IO a
`catches` [ forall a e. Exception e => (e -> IO a) -> Handler a
Handler (IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler IO String
dump String
label)
                         , forall a e. Exception e => (e -> IO a) -> Handler a
Handler (IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler IO String
dump String
label)
                         ]
    else IO ()
action

printSVar :: IO String -> String -> IO ()
printSVar :: IO String -> String -> IO ()
printSVar IO String
dump String
how = do
    String
svInfo <- IO String
dump
    Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
"\n" forall a. Semigroup a => a -> a -> a
<> String
how forall a. Semigroup a => a -> a -> a
<> String
"\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo

-------------------------------------------------------------------------------
-- Cleanup
-------------------------------------------------------------------------------

-- | Never called from a worker thread.
cleanupSVar :: IORef (Set ThreadId) -> IO ()
cleanupSVar :: IORef (Set ThreadId) -> IO ()
cleanupSVar IORef (Set ThreadId)
workerSet = do
    Set ThreadId
workers <- forall a. IORef a -> IO a
readIORef IORef (Set ThreadId)
workerSet
    -- self <- myThreadId
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
          -- (Prelude.filter (/= self) $ Set.toList workers)
          (forall a. Set a -> [a]
Set.toList Set ThreadId
workers)

-------------------------------------------------------------------------------
-- Evaluator
-------------------------------------------------------------------------------

-- | @concatMapDivK useTail useHead stream@, divides the stream in head and
-- tail, maps a stream generator on the head and maps an action on the tail of
-- a stream. Returns the stream generated by the head.
--
-- Used for concurrent evaluation of streams using a Channel.
{-# INLINE concatMapDivK #-}
concatMapDivK :: Monad m =>
       (K.StreamK m a -> m ())
    -> (a -> K.StreamK m b)
    -> K.StreamK m a
    -> K.StreamK m b
concatMapDivK :: forall (m :: * -> *) a b.
Monad m =>
(StreamK m a -> m ())
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
concatMapDivK StreamK m a -> m ()
useTail a -> StreamK m b
useHead StreamK m a
stream =
    forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp -> do
        let foldShared :: StreamK m b -> m r
foldShared = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp
            single :: a -> m r
single a
a = StreamK m b -> m r
foldShared forall a b. (a -> b) -> a -> b
$ a -> StreamK m b
useHead a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = StreamK m a -> m ()
useTail StreamK m a
r forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m r
single a
a
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
stream