-- |
-- Module      : Streamly.Internal.Data.SVar
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.SVar
    (
      module Streamly.Internal.Data.SVar.Type
    , module Streamly.Internal.Data.SVar.Worker
    , module Streamly.Internal.Data.SVar.Dispatch
    , module Streamly.Internal.Data.SVar.Pull

    -- * New SVar
    , getYieldRateInfo
    , newSVarStats

    -- ** Parallel
    , newParallelVar

    -- ** Ahead
    , enqueueAhead
    , reEnqueueAhead
    , queueEmptyAhead
    , dequeueAhead
    , HeapDequeueResult(..)
    , dequeueFromHeap
    , dequeueFromHeapSeq
    , requeueOnHeapTop
    , updateHeapSeq
    , withIORef
    , heapIsSane
    , newAheadVar
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent.MVar (newEmptyMVar, tryPutMVar, tryTakeMVar, newMVar)
import Control.Exception (assert)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (newIORef, readIORef)
import Data.IORef (IORef, atomicModifyIORef)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, askRunInIO, RunInIO)
import Streamly.Internal.Data.Atomics
       (atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..))

import qualified Data.Heap as H
import qualified Data.Set as S

import Streamly.Internal.Data.SVar.Dispatch
import Streamly.Internal.Data.SVar.Pull
import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Worker

-------------------------------------------------------------------------------
-- Creating an SVar
-------------------------------------------------------------------------------

getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st = do
    -- convert rate in Hertz to latency in Nanoseconds
    let rateToLatency :: a -> a
rateToLatency a
r = if a
r forall a. Ord a => a -> a -> Bool
<= a
0 then forall a. Bounded a => a
maxBound else forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ a
1.0e9 forall a. Fractional a => a -> a -> a
/ a
r
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
        Just (Rate Double
low Double
goal Double
high Int
buf) ->
            let l :: NanoSecond64
l    = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
                minl :: NanoSecond64
minl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
                maxl :: NanoSecond64
maxl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
            in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
        Maybe Rate
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

    where

    mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
        IORef NanoSecond64
measured <- forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
        IORef (Count, Count, NanoSecond64)
wcur     <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        IORef (Count, Count, NanoSecond64)
wcol     <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        AbsTime
now      <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Count, AbsTime)
wlong    <- forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
        IORef Count
period   <- forall a. a -> IO (IORef a)
newIORef Count
1
        IORef Count
gainLoss <- forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)

        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just YieldRateInfo
            { svarLatencyTarget :: NanoSecond64
svarLatencyTarget      = NanoSecond64
latency
            , svarLatencyRange :: LatencyRange
svarLatencyRange       = LatencyRange
latRange
            , svarRateBuffer :: Int
svarRateBuffer         = Int
buf
            , svarGainedLostYields :: IORef Count
svarGainedLostYields   = IORef Count
gainLoss
            , workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency State t m a
st
            , workerPollingInterval :: IORef Count
workerPollingInterval  = IORef Count
period
            , workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency  = IORef NanoSecond64
measured
            , workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency   = IORef (Count, Count, NanoSecond64)
wcur
            , workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
            , svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency     = IORef (Count, AbsTime)
wlong
            }

newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
    IORef Int
disp   <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWrk <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxOq  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxHs  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWq  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Count, NanoSecond64)
avgLat <- forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
maxLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
minLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef (Maybe AbsTime)
stpTime <- forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing

    forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats
        { totalDispatches :: IORef Int
totalDispatches  = IORef Int
disp
        , maxWorkers :: IORef Int
maxWorkers       = IORef Int
maxWrk
        , maxOutQSize :: IORef Int
maxOutQSize      = IORef Int
maxOq
        , maxHeapSize :: IORef Int
maxHeapSize      = IORef Int
maxHs
        , maxWorkQSize :: IORef Int
maxWorkQSize     = IORef Int
maxWq
        , avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
        , minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
        , maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
        , svarStopTime :: IORef (Maybe AbsTime)
svarStopTime     = IORef (Maybe AbsTime)
stpTime
        }

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- XXX Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than one item in it.
--
-- XXX we can fix this. When we queue more than one item on the queue we can
-- mark the previously queued item as not-runnable. The not-runnable item is
-- not dequeued until the already running one has finished and at that time we
-- would also know the exact sequence number of the already queued item.
--
-- we can even run the already queued items but they will have to be sorted in
-- layers in the heap. We can use a list of heaps for that.
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q (RunInIO m, t m a)
m = do
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \ case
        ([], Int
n) -> ([forall a b. (a, b) -> b
snd (RunInIO m, t m a)
m], Int
n forall a. Num a => a -> a -> a
+ Int
1)  -- increment sequence
        ([t m a], Int)
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"enqueueAhead: queue is not empty"
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-- enqueue without incrementing the sequence number
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \ case
        ([], Int
n) -> ([t m a
m], Int
n)  -- DO NOT increment sequence
        ([t m a], Int)
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"reEnqueueAhead: queue is not empty"
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-- Normally the thread that has the token should never go away. The token gets
-- handed over to another thread, but someone or the other has the token at any
-- point of time. But if the task that has the token finds that the outputQueue
-- is full, in that case it can go away without even handing over the token to
-- another thread. In that case it sets the nextSequence number in the heap its
-- own sequence number before going away. To handle this case, any task that
-- does not have the token tries to dequeue from the heap first before
-- dequeuing from the work queue. If it finds that the task at the top of the
-- heap is the one that owns the current sequence number then it grabs the
-- token and starts with that.
--
-- XXX instead of queueing just the head element and the remaining computation
-- on the heap, evaluate as many as we can and place them on the heap. But we
-- need to give higher priority to the lower sequence numbers so that lower
-- priority tasks do not fill up the heap making higher priority tasks block
-- due to full heap. Maybe we can have a weighted space for them in the heap.
-- The weight is inversely proportional to the sequence number.
--
-- XXX review for livelock
--
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m Bool
queueEmptyAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    ([t m a]
xs, Int
_) <- forall a. IORef a -> IO a
readIORef IORef ([t m a], Int)
q
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null [t m a]
xs

{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
    => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \case
            ([], Int
n) -> (([], Int
n), forall a. Maybe a
Nothing)
            (t m a
x : [], Int
n) -> (([], Int
n), forall a. a -> Maybe a
Just (t m a
x, Int
n))
            ([t m a], Int)
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"more than one item on queue"

-------------------------------------------------------------------------------
-- Heap manipulation
-------------------------------------------------------------------------------

withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = forall a. IORef a -> IO a
readIORef IORef a
ref forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f

atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())

data HeapDequeueResult t m a =
      Clearing
    | Waiting Int
    | Ready (Entry Int (AheadHeapEntry t m a))

{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
            Just Int
n -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                            if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
n
                            then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                            else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)

{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                        if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
i
                        then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                        else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
            Just Int
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"

heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
    case Maybe Int
snum of
        Maybe Int
Nothing -> Bool
True
        Just Int
n -> Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n

{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Entry Int (AheadHeapEntry t m a)
    -> Int
    -> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)

{-# INLINE updateHeapSeq #-}
updateHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)

-- XXX remove polymorphism in t, inline f
getAheadSVar :: MonadAsync m
    => State t m a
    -> (   IORef ([t m a], Int)
        -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
        -> State t m a
        -> SVar t m a
        -> Maybe WorkerInfo
        -> m ())
    -> RunInIO m
    -> IO (SVar t m a)
getAheadSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    -- the second component of the tuple is "Nothing" when heap is being
    -- cleared, "Just n" when we are expecting sequence number n to arrive
    -- before we can start clearing the heap.
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH    <- forall a. a -> IO (IORef a)
newIORef (forall a. Heap a
H.empty, forall a. a -> Maybe a
Just Int
0)
    MVar ()
outQMv  <- forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
    -- Sequence number is incremented whenever something is queued, therefore,
    -- first sequence number would be 0
    IORef ([t m a], Int)
q <- forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
    MVar ()
stopMVar <- forall a. a -> IO (MVar a)
newMVar ()
    Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
            Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let getSVar :: SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
readOutput SVar t m a -> m Bool
postProc = SVar
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = forall a. HasCallStack => a
undefined
            , remainingWork :: Maybe (IORef Count)
remainingWork  = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
            , pushBufferSpace :: IORef Count
pushBufferSpace = forall a. HasCallStack => a
undefined
            , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = forall a. HasCallStack => a
undefined
            , pushBufferMVar :: MVar ()
pushBufferMVar   = forall a. HasCallStack => a
undefined
            , maxWorkerLimit :: Limit
maxWorkerLimit   = forall a. Ord a => a -> a -> a
min (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State t m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = forall a. HasCallStack => a
undefined
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = SVar t m a -> m [ChildEvent a]
readOutput SVar t m a
sv
            , postProcess :: m Bool
postProcess      = SVar t m a -> m Bool
postProc SVar t m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
            , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH State t m a
st{streamVar :: Maybe (SVar t m a)
streamVar = forall a. a -> Maybe a
Just SVar t m a
sv} SVar t m a
sv
            , enqueue :: (RunInIO m, t m a) -> IO ()
enqueue          = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q
            , isWorkDone :: IO Bool
isWorkDone       = forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
       {a} {b} {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
            , isQueueDone :: IO Bool
isQueueDone      = forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
       {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef ([t m a], Int)
q
            , needDoorBell :: IORef Bool
needDoorBell     = IORef Bool
wfw
            , svarStyle :: SVarStyle
svarStyle        = SVarStyle
AheadVar
            , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
StopNone
            , svarStopBy :: IORef ThreadId
svarStopBy       = forall a. HasCallStack => a
undefined
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            , accountThread :: ThreadId -> m ()
accountThread    = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
stopMVar
            , svarRef :: Maybe (IORef ())
svarRef          = forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue   = IORef ([t m a], Int)
q
            , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap       = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: SVar t m a
sv =
            case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
                Maybe Rate
Nothing -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
                Just Rate
_  -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
     in forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv

    where

    {-# INLINE isQueueDoneAhead #-}
    isQueueDoneAhead :: SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q = do
        Bool
queueDone <- forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
        Bool
yieldsDone <-
                case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
                    Just IORef Count
yref -> do
                        Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
yref
                        forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
                    Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        -- XXX note that yieldsDone can only be authoritative only when there
        -- are no workers running. If there are active workers they can
        -- later increment the yield count and therefore change the result.
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone

    {-# INLINE isWorkDoneAhead #-}
    isWorkDoneAhead :: SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
        Bool
heapDone <- do
                (Heap a
hp, b
_) <- forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Heap a -> Int
H.size Heap a
hp forall a. Ord a => a -> a -> Bool
<= Int
0)
        Bool
queueDone <- forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
       {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone

    checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
        (t a
xs, b
_) <- forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs

{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
    => State t m a
    -> t m a
    -> (   IORef ([t m a], Int)
        -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
        -> State t m a
        -> SVar t m a
        -> Maybe WorkerInfo
        -> m ())
    -> m (SVar t m a)
newAheadVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar t m a)
newAheadVar State t m a
st t m a
m IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop = do
    RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    SVar t m a
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop RunInIO m
mrun
    forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m

-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------

-- XXX To be implemented. Use a linked queue like WAsync and put back the
-- remaining computation at the back of the queue instead of the heap, and
-- increment the sequence number.

-------------------------------------------------------------------------------
-- Parallel
-------------------------------------------------------------------------------

getParallelSVar :: MonadIO m
    => SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    IORef ([ChildEvent a], Int)
outQRev <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv  <- forall a. IO (MVar a)
newEmptyMVar
    MVar ()
outQMvRev <- forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
    Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
            Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
    let bufLim :: Count
bufLim =
            case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st of
                Limit
Unlimited -> forall a. HasCallStack => a
undefined
                Limited Word
x -> forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x
    IORef Count
remBuf <- forall a. a -> IO (IORef a)
newIORef Count
bufLim
    MVar ()
pbMVar <- forall a. a -> IO (MVar a)
newMVar ()

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    IORef ThreadId
stopBy <-
        case SVarStopStyle
ss of
            SVarStopStyle
StopBy -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. HasCallStack => a
undefined
            SVarStopStyle
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. HasCallStack => a
undefined

    let sv :: SVar t m a
sv =
            SVar { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
                 , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
outQRev
                 , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
                 , maxBufferLimit :: Limit
maxBufferLimit   = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
                 , pushBufferSpace :: IORef Count
pushBufferSpace  = IORef Count
remBuf
                 , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
PushBufferBlock
                 , pushBufferMVar :: MVar ()
pushBufferMVar   = MVar ()
pbMVar
                 , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit
Unlimited
                 -- Used only for diagnostics
                 , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
                 , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
                 , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
                 , readOutputQ :: m [ChildEvent a]
readOutputQ      = forall {m :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
MonadIO m =>
SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv
                 , postProcess :: m Bool
postProcess      = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
                 , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
                 , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = forall a. HasCallStack => a
undefined
                 , enqueue :: (RunInIO m, t m a) -> IO ()
enqueue          = forall a. HasCallStack => a
undefined
                 , isWorkDone :: IO Bool
isWorkDone       = forall a. HasCallStack => a
undefined
                 , isQueueDone :: IO Bool
isQueueDone      = forall a. HasCallStack => a
undefined
                 , needDoorBell :: IORef Bool
needDoorBell     = forall a. HasCallStack => a
undefined
                 , svarStyle :: SVarStyle
svarStyle        = SVarStyle
ParallelVar
                 , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
ss
                 , svarStopBy :: IORef ThreadId
svarStopBy       = IORef ThreadId
stopBy
                 , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
                 , workerCount :: IORef Int
workerCount      = IORef Int
active
                 , accountThread :: ThreadId -> m ()
accountThread    = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
                 , workerStopMVar :: MVar ()
workerStopMVar   = forall a. HasCallStack => a
undefined
                 , svarRef :: Maybe (IORef ())
svarRef          = forall a. Maybe a
Nothing
                 , svarInspectMode :: Bool
svarInspectMode  = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
                 , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
                 , aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue   = forall a. HasCallStack => a
undefined
                 , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap       = forall a. HasCallStack => a
undefined
                 , svarStats :: SVarStats
svarStats        = SVarStats
stats
                 }
     in forall (m :: * -> *) a. Monad m => a -> m a
return forall {t :: (* -> *) -> * -> *}. SVar t m a
sv

    where

    readOutputQPar :: SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [Char] -> IO () -> IO ()
withDiagMVar SVar t m a
sv [Char]
"readOutputQPar: doorbell"
            forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
        case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
            Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just YieldRateInfo
yinfo -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
        [ChildEvent a]
r <- forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO (Maybe a)
tryTakeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv
            IO ()
writeBarrier
            forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
        forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
r

{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
    => SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State t m a
st = do
    RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun