-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Append
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- The functions in this module are separated from the combinators using
-- these because of a GHC issue. We need to have newChannel specialized but
-- not inlined. If we keep it in the same module as its users we cannot achieve
-- that and the code becomes bloated. But if it is in a separate module we can
-- use INLINABLE and SPECIALIZE on it which makes it specialized but it is not
-- actually inlined.

module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
    (
      newChannel
    )
where

import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar, newMVar, putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Kind (Type)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
    (MonadRunInIO, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
    (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (modifyThread)

import qualified Data.Heap as H
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Worker

------------------------------------------------------------------------------
-- Concurrent streams with first-come-first serve results
------------------------------------------------------------------------------

-- Note: For purely right associated expressions this queue should have at most
-- one element. It grows to more than one when we have left associcated
-- expressions. Large left associated compositions can grow this to a
-- large size
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
      Channel m a
   -> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
   -> Bool
   -> (RunInIO m, K.StreamK m a)
   -> IO ()
enqueueLIFO :: forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner (RunInIO m, StreamK m a)
m = do
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q forall a b. (a -> b) -> a -> b
$ \([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) ->
        if Bool
inner then ([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
m forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
ys) else ((RunInIO m, StreamK m a)
m forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys)
    IORef Bool -> MVar () -> IO ()
ringDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

data QResult a = QEmpty | QOuter a | QInner a

{-# INLINE dequeue #-}
dequeue :: MonadIO m =>
       IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
    -> m (QResult (RunInIO m, K.StreamK m a))
dequeue :: forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref =
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
        forall a b. (a -> b) -> a -> b
$ \case
            ([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
y : [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), forall a. a -> QResult a
QInner (RunInIO m, StreamK m a)
y)
            ((RunInIO m, StreamK m a)
x : [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), forall a. a -> QResult a
QOuter (RunInIO m, StreamK m a)
x)
            ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x -> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x, forall a. QResult a
QEmpty)

data WorkerStatus = Continue | Suspend

{-# INLINE workLoopLIFO #-}
workLoopLIFO
    :: MonadRunInIO m
    => IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    run :: m ()
run = do
        QResult (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
        case QResult (RunInIO m, StreamK m a)
work of
            QResult (RunInIO m, StreamK m a)
QEmpty ->
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            QInner (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
                (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
True
            QOuter (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
                (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
False

    process :: (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
runin StreamK m a
m Bool
inner = do
        -- XXX when we finish we need to send the monadic state back to
        -- the parent so that the state can be merged back. We capture
        -- and return the state in the stop continuation.
        --
        -- Instead of using the run function we can just restore the
        -- monad state here. That way it can work easily for
        -- distributed case as well.
        StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
runin forall a b. (a -> b) -> a -> b
$
                forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                    forall a. HasCallStack => a
undefined
                    a -> StreamK m a -> m WorkerStatus
yieldk
                    forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
                    (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue)
                    StreamK m a
m
        WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
        case WorkerStatus
res of
            WorkerStatus
Continue -> m ()
run
            WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

        where

        single :: a -> m WorkerStatus
single a
a = do
            Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

        yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
            Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            if Bool
res
            then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
r
            else do
                RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
                forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

-- We duplicate workLoop for yield limit and no limit cases because it has
-- around 40% performance overhead in the worst case.
--
-- XXX we can pass yinfo directly as an argument here so that we do not have to
-- make a check every time.
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
    :: forall m a. MonadRunInIO m
    => IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    incrContinue :: m WorkerStatus
incrContinue =
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue

    run :: m ()
run = do
        QResult (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
        case QResult (RunInIO m, StreamK m a)
work of
            QResult (RunInIO m, StreamK m a)
QEmpty ->
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            QInner (RunInIO m, StreamK m a)
item ->
                (RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
True
            QOuter (RunInIO m, StreamK m a)
item ->
                (RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
False

    process :: (RunInIO m, StreamK m a) -> Bool -> m ()
process item :: (RunInIO m, StreamK m a)
item@(RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) Bool
inner = do
        -- XXX This is just a best effort minimization of concurrency
        -- to the yield limit. If the stream is made of concurrent
        -- streams we do not reserve the yield limit in the constituent
        -- streams before executing the action. This can be done
        -- though, by sharing the yield limit ref with downstream
        -- actions via state passing. Just a todo.
        Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
yieldLimitOk
        then do
            StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin forall a b. (a -> b) -> a -> b
$
                    forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                        forall a. HasCallStack => a
undefined
                        a -> StreamK m a -> m WorkerStatus
yieldk
                        forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
                        m WorkerStatus
incrContinue
                        StreamK m a
m
            WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
            case WorkerStatus
res of
                WorkerStatus
Continue -> m ()
run
                WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
        -- Avoid any side effects, undo the yield limit decrement if we
        -- never yielded anything.
        else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m, StreamK m a)
item
            Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

        where

        single :: a -> m WorkerStatus
single a
a = do
            Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

        -- XXX can we pass on the yield limit downstream to limit the
        -- concurrency of constituent streams.
        yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
            Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
            then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
r
            else do
                RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
                forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

-------------------------------------------------------------------------------
-- Ahead Channel Data Structures
-------------------------------------------------------------------------------

-- XXX Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Therefore the queue never has more than one item in it. However, in case of
-- parIterateConcatMap the iteration may add more items at the end of the
-- queue.
--
-- XXX we can fix this. When we queue more than one item on the queue we can
-- mark the previously queued item as not-runnable. The not-runnable item is
-- not dequeued until the already running one has finished and at that time we
-- would also know the exact sequence number of the already queued item.
--
-- we can even run the already queued items but they will have to be sorted in
-- layers in the heap. We can use a list of heaps for that.
{-# ANN enqueueAhead "HLint: ignore" #-}
{-# INLINE enqueueAhead #-}
enqueueAhead ::
       Channel m a
    -> IORef ([K.StreamK m a], Int)
    -> (RunInIO m, K.StreamK m a)
    -> IO ()
enqueueAhead :: forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
q (RunInIO m, StreamK m a)
m = do
    -- XXX The queue is LIFO. When parConcatIterate queues more than one items
    -- to the queue it will perform a DFS style traversal. For BFS we will have
    -- to use a FIFO data structure here. That would require another Config
    -- option.
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([StreamK m a], Int)
q forall a b. (a -> b) -> a -> b
$ \([StreamK m a]
xs, Int
n) -> (forall a b. (a, b) -> b
snd (RunInIO m, StreamK m a)
mforall a. a -> [a] -> [a]
:[StreamK m a]
xs, Int
n)
    IORef Bool -> MVar () -> IO ()
ringDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

-- Normally the thread that has the token should never go away. The token gets
-- handed over to another thread, but someone or the other has the token at any
-- point of time. But if the task that has the token finds that the outputQueue
-- is full, in that case it can go away without even handing over the token to
-- another thread. In that case it sets the nextSequence number in the heap its
-- own sequence number before going away. To handle this case, any task that
-- does not have the token tries to dequeue from the heap first before
-- dequeuing from the work queue. If it finds that the task at the top of the
-- heap is the one that owns the current sequence number then it grabs the
-- token and starts with that.
--
-- XXX instead of queueing just the head element and the remaining computation
-- on the heap, evaluate as many as we can and place them on the heap. But we
-- need to give higher priority to the lower sequence numbers so that lower
-- priority tasks do not fill up the heap making higher priority tasks block
-- due to full heap. Maybe we can have a weighted space for them in the heap.
-- The weight is inversely proportional to the sequence number.
--
-- XXX review for livelock

{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
    => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \case
            ([], Int
n) -> (([], Int
n), forall a. Maybe a
Nothing)
            (t m a
x : [t m a]
xs, Int
n) -> (([t m a]
xs, Int
n forall a. Num a => a -> a -> a
+ Int
1), forall a. a -> Maybe a
Just (t m a
x, Int
n forall a. Num a => a -> a -> a
+ Int
1))

-- Dequeue only if the seq number matches the expected seq number.
{-# INLINE dequeueAheadSeqCheck #-}
dequeueAheadSeqCheck :: MonadIO m
    => IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([t m a], Int)
q Int
seqNo = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \case
            ([], Int
n) -> (([], Int
n), forall a. Maybe a
Nothing)
            (t m a
x : [t m a]
xs, Int
n) ->
                if Int
n forall a. Num a => a -> a -> a
+ Int
1 forall a. Eq a => a -> a -> Bool
== Int
seqNo
                then (([t m a]
xs, Int
n forall a. Num a => a -> a -> a
+ Int
1), forall a. a -> Maybe a
Just t m a
x)
                else ((t m a
x forall a. a -> [a] -> [a]
: [t m a]
xs, Int
n), forall a. Maybe a
Nothing)

-------------------------------------------------------------------------------
-- Heap manipulation
-------------------------------------------------------------------------------

withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = forall a. IORef a -> IO a
readIORef IORef a
ref forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f

atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())

data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
      AheadEntryNull
    | AheadEntryPure a
    | AheadEntryStream (RunInIO m, t m a)

data HeapDequeueResult t m a =
      Clearing
    | Waiting Int
    | Ready (Entry Int (AheadHeapEntry t m a))

{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
            Just Int
n -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                            if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
n
                            then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                            else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)

{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
    forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                        if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
i
                        then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                        else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
            Just Int
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"

heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
    case Maybe Int
snum of
        Maybe Int
Nothing -> Bool
True
        Just Int
n -> Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n

{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Entry Int (AheadHeapEntry t m a)
    -> Int
    -> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)

{-# INLINE updateHeapSeq #-}
updateHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)

------------------------------------------------------------------------------
-- Ahead: Concurrent streams with ordered results
------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than on item in it.
--
-- XXX Also note that limiting concurrency for cases like "take 10" would not
-- work well with left associative expressions, because we have no visibility
-- about how much the left side of the expression would yield.
--
-- XXX It may be a good idea to increment sequence numbers for each yield,
-- currently a stream on the left side of the expression may yield many
-- elements with the same sequene number. We can then use the seq number to
-- enforce yieldMax and yieldLImit as well.

-- Invariants:
--
-- * A worker should always ensure that it pushes all the consecutive items in
-- the heap to the outputQueue especially the items on behalf of the workers
-- that have already left when we were holding the token. This avoids deadlock
-- conditions when the later workers completion depends on the consumption of
-- earlier results. For more details see comments in the consumer pull side
-- code.

{-# INLINE underMaxHeap #-}
underMaxHeap ::
       Channel m a
    -> Heap (Entry Int (AheadHeapEntry K.StreamK m a))
    -> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
    ([ChildEvent a]
_, Int
len) <- forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)

    -- XXX simplify this
    let maxHeap :: Limit
maxHeap = case forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv of
            Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$
                forall a. Ord a => a -> a -> a
max Word
0 (Word
lim forall a. Num a => a -> a -> a
- forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
            Limit
Unlimited -> Limit
Unlimited

    case Limit
maxHeap of
        Limited Word
lim -> do
            Int
active <- forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp forall a. Num a => a -> a -> a
+ Int
active forall a. Ord a => a -> a -> Bool
<= forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
        Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- Return value:
-- True => stop
-- False => continue
preStopCheck ::
       Channel m a
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int)
    -> IO Bool
preStopCheck :: forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
    -- check the stop condition under a lock before actually
    -- stopping so that the whole herd does not stop at once.
    forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
        Bool
heapOk <- forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
        forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv)
        let stopping :: IO Bool
stopping = do
                forall a. MVar a -> a -> IO ()
putMVar (forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            continue :: IO Bool
continue = do
                forall a. MVar a -> a -> IO ()
putMVar (forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        if Bool
heapOk
        then
            case forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
                Maybe YieldRateInfo
Nothing -> IO Bool
continue
                Just YieldRateInfo
yinfo -> do
                    Bool
rateOk <-
                        Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate
                            (forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) YieldRateInfo
yinfo
                    if Bool
rateOk then IO Bool
continue else IO Bool
stopping
        else IO Bool
stopping

abortExecution :: Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo = do
    Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
    forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
-- a 100,000 threads producing output and queuing it to the heap for
-- sequencing. The heap can be drained only by one thread at a time, any thread
-- that finds that heap can be drained now, takes a lock and starts draining
-- it, however the thread may get prempted in the middle of it holding the
-- lock. Since that thread is holding the lock, the other threads cannot pick
-- up the draining task, therefore they proceed to picking up the next task to
-- execute. If the draining thread could yield voluntarily at a point where it
-- has released the lock, then the next threads could pick up the draining
-- instead of executing more tasks. When there are 100,000 threads the drainer
-- gets a cpu share to run only 1:100000 of the time. This makes the heap
-- accumulate a lot of output when we the buffer size is large.
--
-- The solutions to this problem are:
-- 1) make the other threads wait in a queue until the draining finishes
-- 2) make the other threads queue and go away if draining is in progress
--
-- In both cases we give the drainer a chance to run more often.
--
processHeap
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> AheadHeapEntry K.StreamK m a
    -> Int
    -> Bool -- we are draining the heap before we stop
    -> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry

    where

    stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r = do
        Bool
stopIt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        if Bool
stopIt
        then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            -- put the entry back in the heap and stop
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
            forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
        else Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r

    loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
        case AheadHeapEntry StreamK m a
ent of
            AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
            AheadEntryPure a
a -> do
                -- Use 'send' directly so that we do not account this in worker
                -- latency as this will not be the real latency.
                -- Don't stop the worker in this case as we are just
                -- transferring available results from heap to outputQueue.
                forall (f :: * -> *) a. Functor f => f a -> f ()
void
                    forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                    forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
                        (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv) (forall a. a -> ChildEvent a
ChildYield a
a)
                Int -> m ()
nextHeap Int
seqNo
            AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
r) -> do
                if Bool
stopping
                then AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r
                else do
                    StM m ()
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin (Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r)
                    forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res

    nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
        HeapDequeueResult StreamK m a
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1)
        case HeapDequeueResult StreamK m a
res of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
            HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ ->
                if Bool
stopping
                then do
                    Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
                    if Bool
r
                    then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                    else Int -> m ()
processWorkQueue Int
prevSeqNo
                else forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo

    processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
        Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
yieldLimitOk
        then do
            Maybe (StreamK m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
            case Maybe (StreamK m a, Int)
work of
                Maybe (StreamK m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                Just (StreamK m a
m, Int
seqNo) -> do
                    if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1
                    then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
                    else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
        else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo

    -- We do not stop the worker on buffer full here as we want to proceed to
    -- nextHeap anyway so that we can clear any subsequent entries. We stop
    -- only in yield continuation where we may have a remaining stream to be
    -- pushed on the heap.
    singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        Int -> m ()
nextHeap Int
seqNo

    -- XXX when we have an unfinished stream on the heap we cannot account all
    -- the yields of that stream until it finishes, so if we have picked up
    -- and executed more actions beyond that in the parent stream and put them
    -- on the heap then they would eat up some yield limit which is not
    -- correct, we will think that our yield limit is over even though we have
    -- to yield items from unfinished stream before them. For this reason, if
    -- there are pending items in the heap we drain them unconditionally
    -- without considering the yield limit.
    runStreamWithYieldLimit :: Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r = do
        Bool
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
continue -- see comment above -- && yieldLimitOk
        then do
            let stopk :: m ()
stopk = do
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
                  Int -> m ()
nextHeap Int
seqNo
            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
                          (Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo)
                          (Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
                          m ()
stopk
                          StreamK m a
r
        else do
            RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
                Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

    yieldStreamFromHeap :: Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo a
a StreamK m a
r = do
        Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r

{-# NOINLINE drainHeap #-}
drainHeap
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
    HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
    case HeapDequeueResult StreamK m a
r of
        Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
            forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
        HeapDequeueResult StreamK m a
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

data HeapStatus = HContinue | HStop

processWithoutToken
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> K.StreamK m a
    -> Int
    -> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo = do
    -- we have already decremented the yield limit for m
    let stopk :: m WorkerStatus
stopk = do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
            -- If the stream stops without yielding anything, and we do not put
            -- anything on heap, but if heap was waiting for this seq number
            -- then it will keep waiting forever, because we are never going to
            -- put it on heap. So we have to put a null entry on heap even when
            -- we stop.
            forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
        mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv

    StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
                (\a
a StreamK m a
r -> do
                    RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                    forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a StreamK m a
r))
                (forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
                m WorkerStatus
stopk
                StreamK m a
m
    WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
    case WorkerStatus
res of
        WorkerStatus
Continue -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
        WorkerStatus
Suspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo

    where

    -- XXX to reduce contention each CPU can have its own heap
    toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
        -- Heap insertion is an expensive affair so we use a non CAS based
        -- modification, otherwise contention and retries can make a thread
        -- context switch and throw it behind other threads which come later in
        -- sequence.
        Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
            let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
            in forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')

        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                Int
maxHp <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp forall a. Ord a => a -> a -> Bool
> Int
maxHp) forall a b. (a -> b) -> a -> b
$
                    forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)

        Bool
heapOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
        HeapStatus
status <-
            case forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
                Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                Just YieldRateInfo
yinfo ->
                    case Maybe WorkerInfo
winfo of
                        Just WorkerInfo
info -> do
                            Bool
rateOk <-
                                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                                    forall a b. (a -> b) -> a -> b
$ Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl
                                        (forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
                                        (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
                                        YieldRateInfo
yinfo
                                        WorkerInfo
info
                            if Bool
rateOk
                            then forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                            else forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
                        Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue

        if Bool
heapOk
        then
            case HeapStatus
status of
                HeapStatus
HContinue -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
                HeapStatus
HStop -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
        else forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

data TokenWorkerStatus = TokenContinue Int | TokenSuspend

processWithToken
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> K.StreamK m a
    -> Int
    -> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
action Int
sno = do
    -- Note, we enter this function with yield limit already decremented
    -- XXX deduplicate stop in all invocations
    let stopk :: m TokenWorkerStatus
stopk = do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno forall a. Num a => a -> a -> a
+ Int
1)
        mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv

    StM m TokenWorkerStatus
r <-
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
            forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun
            forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                forall a. HasCallStack => a
undefined (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
sno) (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stopk StreamK m a
action

    TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
    case TokenWorkerStatus
res of
        TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
        TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo

    where

    singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
        Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        if Bool
continue
        then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
        else do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
            forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    -- XXX use a wrapper function around stop so that we never miss
    -- incrementing the yield in a stop continuation. Essentiatlly all
    -- "unstream" calls in this function must increment yield limit on stop.
    yieldOutput :: Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a StreamK m a
r = do
        Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then do
            let stopk :: m TokenWorkerStatus
stopk = do
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
                          (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                          (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                          m TokenWorkerStatus
stopk
                          StreamK m a
r
        else do
            RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
        let preExit :: m ()
preExit = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
                Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        -- To avoid a race when another thread puts something
        -- on the heap and goes away, the consumer will not get
        -- a doorBell and we will not clear the heap before
        -- executing the next action. If the consumer depends
        -- on the output that is stuck in the heap then this
        -- will result in a deadlock. So we always clear the
        -- heap before executing the next action.
        if Bool
yieldLimitOk
        then do
            -- XXX Instead of checking seqno inside dequeue we can dequeue
            -- unconditionally and if the seqNo is not the same as nextSeqNo
            -- then release the token and call processWithoutToken. Need
            -- to check the performance though.
            Maybe (StreamK m a)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([StreamK m a], Int)
q Int
nextSeqNo
            case Maybe (StreamK m a)
work of
                Maybe (StreamK m a)
Nothing -> m ()
preExit forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
                Just StreamK m a
m -> do
                    let stopk :: m TokenWorkerStatus
stopk = do
                            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
nextSeqNo forall a. Num a => a -> a -> a
+ Int
1)
                        mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
                    StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
                        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
                                      (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
nextSeqNo)
                                      (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
nextSeqNo)
                                      m TokenWorkerStatus
stopk
                                      StreamK m a
m
                    TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
                    case TokenWorkerStatus
res of
                        TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
                        TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
        else m ()
preExit forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo

-- XXX the yield limit changes increased the performance overhead by 30-40%.
-- Just like AsyncT we can use an implementation without yeidlimit and even
-- without pacing code to keep the performance higher in the unlimited and
-- unpaced case.
--
-- XXX The yieldLimit stuff is pretty invasive. We can instead do it by using
-- three hooks, a pre-execute hook, a yield hook and a stop hook. In fact these
-- hooks can be used for a more general implementation to even check predicates
-- and not just yield limit.

workLoopAhead
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
        HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        case HeapDequeueResult StreamK m a
r of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
                forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
            HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ -> do
                -- Before we execute the next item from the work queue we check
                -- if we are beyond the yield limit. It is better to check the
                -- yield limit before we pick up the next item. Otherwise we
                -- may have already started more tasks even though we may have
                -- reached the yield limit.  We can avoid this by taking active
                -- workers into account, but that is not as reliable, because
                -- workers may go away without picking up work and yielding a
                -- value.
                --
                -- Rate control can be done either based on actual yields in
                -- the output queue or based on any yield either to the heap or
                -- to the output queue. In both cases we may have one issue or
                -- the other. We chose to do this based on actual yields to the
                -- output queue because it makes the code common to both async
                -- and ahead streams.
                --
                Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                if Bool
yieldLimitOk
                then do
                    Maybe (StreamK m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
                    case Maybe (StreamK m a, Int)
work of
                        Maybe (StreamK m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                        Just (StreamK m a
m, Int
seqNo) -> do
                            if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
0
                            then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
                            else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
                else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo

-------------------------------------------------------------------------------
-- SVar creation
-- This code belongs in SVar.hs but is kept here for perf reasons
-------------------------------------------------------------------------------

-- XXX we have this function in this file because passing runStreamLIFO as a
-- function argument to this function results in a perf degradation of more
-- than 10%.  Need to investigate what the root cause is.
-- Interestingly, the same thing does not make any difference for Ahead.
-- {-# INLINABLE getLifoSVar #-}
getLifoSVar :: forall m a. MonadRunInIO m =>
    RunInIO m -> Config -> IO (Channel m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ    <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    -- the second component of the tuple is "Nothing" when heap is being
    -- cleared, "Just n" when we are expecting sequence number n to arrive
    -- before we can start clearing the heap.
    IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH    <- forall a. a -> IO (IORef a)
newIORef (forall a. Heap a
H.empty, forall a. a -> Maybe a
Just Int
0)
    MVar ()
outQMv  <- forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
Set.empty
    IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q       <- forall a. a -> IO (IORef a)
newIORef
                ( [] :: [(RunInIO m, K.StreamK m a)]
                , [] :: [(RunInIO m, K.StreamK m a)]
                )
    -- Sequence number is incremented whenever something is de-queued,
    -- therefore, first sequence number would be 0
    IORef ([StreamK m a], Int)
aheadQ <- forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
    MVar ()
stopMVar <- forall a. a -> IO (MVar a)
newMVar ()
    Maybe (IORef Count)
yl <-
        case Config -> Maybe Count
getYieldLimit Config
cfg of
            Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    -- We are reading it without lock, the result would be reliable only if no
    -- worker is pending.
    let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = do
            ([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) <- forall a. IORef a -> IO a
readIORef IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q
            forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
xs Bool -> Bool -> Bool
&& forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
ys)

    let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
            Bool
yieldsDone <-
                    case forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
                        Just IORef Count
ref -> do
                            Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
                            forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
                        Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Bool
qEmpty <- forall {p}. p -> IO Bool
isWorkFinished Channel m a
sv
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone

    let eagerEval :: Bool
eagerEval = Config -> Bool
getEagerDispatch Config
cfg
        inOrder :: Bool
inOrder = Config -> Bool
getOrdered Config
cfg

    let getSVar :: Channel m a
            -> (Channel m a -> m [ChildEvent a])
            -> (Channel m a -> m Bool)
            -> (Channel m a -> IO Bool)
            -> (IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
                -> Channel m a
                -> Maybe WorkerInfo
                -> m())
            -> Channel m a
        getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = Config -> Limit
getMaxBuffer Config
cfg
            , maxWorkerLimit :: Limit
maxWorkerLimit   = forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
            , postProcess :: m Bool
postProcess      = Channel m a -> m Bool
postProc Channel m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running

            , workLoop :: Maybe WorkerInfo -> m ()
workLoop =
                if Bool
inOrder
                then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH Channel m a
sv
                else IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Channel m a
sv
            , enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue =
                \Bool
inner ->
                    if Bool
inOrder
                    then forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
                    else forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner
            , eagerDispatch :: m ()
eagerDispatch = forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
eagerEval forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
            , isWorkDone :: IO Bool
isWorkDone =
                if Bool
inOrder
                then forall {t :: * -> *} {m :: * -> *} {a} {a} {b} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH
                else Channel m a -> IO Bool
workDone Channel m a
sv
            , isQueueDone :: IO Bool
isQueueDone =
                if Bool
inOrder
                then forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
                else Channel m a -> IO Bool
workDone Channel m a
sv

            , doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ  = IORef Bool
wfw
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            -- XXX We can use delThread or modThread based on eager flag.
            , accountThread :: ThreadId -> m ()
accountThread    = forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread IORef (Set ThreadId)
running MVar ()
outQMv
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
stopMVar
            , svarRef :: Maybe (IORef ())
svarRef          = forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a
sv =
            case Config -> Maybe Rate
getStreamRate Config
cfg of
                Maybe Rate
Nothing ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              forall {p}. p -> IO Bool
isWorkFinished
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
                Just Rate
_  ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              forall {p}. p -> IO Bool
isWorkFinished
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
     in forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv

    where

    {-# INLINE isQueueDoneAhead #-}
    isQueueDoneAhead :: Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q = do
        Bool
queueDone <- forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
        Bool
yieldsDone <-
                case forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
                    Just IORef Count
yref -> do
                        Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
yref
                        forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
                    Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        -- XXX note that yieldsDone can only be authoritative only when there
        -- are no workers running. If there are active workers they can
        -- later increment the yield count and therefore change the result.
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone

    {-# INLINE isWorkDoneAhead #-}
    isWorkDoneAhead :: Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
        Bool
heapDone <- do
                (Heap a
hp, b
_) <- forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Heap a -> Int
H.size Heap a
hp forall a. Ord a => a -> a -> Bool
<= Int
0)
        Bool
queueDone <- forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone

    checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
        (t a
xs, b
_) <- forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs

-- | Create a new async style concurrent stream evaluation channel. The monad
-- state used to run the stream actions is taken from the call site of
-- newChannel.
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier = do
    RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)