-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Append
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- The functions in this module are separated from the combinators using
-- these because of a GHC issue. We need to have newAppendChannel specialized but
-- not inlined. If we keep it in the same module as its users we cannot achieve
-- that and the code becomes bloated. But if it is in a separate module we can
-- use INLINABLE and SPECIALIZE on it which makes it specialized but it is not
-- actually inlined.

module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
    (
      newAppendChannel
    )
where

import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar, newMVar, putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Kind (Type)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
    (MonadRunInIO, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
    (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)

import qualified Data.Heap as H
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Channel.Worker

------------------------------------------------------------------------------
-- Concurrent streams with first-come-first serve results
------------------------------------------------------------------------------

-- Note: For purely right associated expressions this queue should have at most
-- one element. It grows to more than one when we have left associcated
-- expressions. Large left associated compositions can grow this to a
-- large size
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
      Channel m a
   -> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
   -> Bool
   -> (RunInIO m, K.StreamK m a)
   -> IO ()
enqueueLIFO :: forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner (RunInIO m, StreamK m a)
m = do
    IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q ((([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
  -> ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
 -> IO ())
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) ->
        if Bool
inner then ([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
m (RunInIO m, StreamK m a)
-> [(RunInIO m, StreamK m a)] -> [(RunInIO m, StreamK m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
ys) else ((RunInIO m, StreamK m a)
m (RunInIO m, StreamK m a)
-> [(RunInIO m, StreamK m a)] -> [(RunInIO m, StreamK m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys)
    IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

data QResult a = QEmpty | QOuter a | QInner a

{-# INLINE dequeue #-}
dequeue :: MonadIO m =>
       IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
    -> m (QResult (RunInIO m, K.StreamK m a))
dequeue :: forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref =
    IO (QResult (RunInIO m, StreamK m a))
-> m (QResult (RunInIO m, StreamK m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO (QResult (RunInIO m, StreamK m a))
 -> m (QResult (RunInIO m, StreamK m a)))
-> IO (QResult (RunInIO m, StreamK m a))
-> m (QResult (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]),
        QResult (RunInIO m, StreamK m a)))
-> IO (QResult (RunInIO m, StreamK m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
        ((([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
  -> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]),
      QResult (RunInIO m, StreamK m a)))
 -> IO (QResult (RunInIO m, StreamK m a)))
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]),
        QResult (RunInIO m, StreamK m a)))
-> IO (QResult (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ \case
            ([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
y : [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), (RunInIO m, StreamK m a) -> QResult (RunInIO m, StreamK m a)
forall a. a -> QResult a
QInner (RunInIO m, StreamK m a)
y)
            ((RunInIO m, StreamK m a)
x : [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), (RunInIO m, StreamK m a) -> QResult (RunInIO m, StreamK m a)
forall a. a -> QResult a
QOuter (RunInIO m, StreamK m a)
x)
            ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x -> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x, QResult (RunInIO m, StreamK m a)
forall a. QResult a
QEmpty)

data WorkerStatus = Continue | Suspend

{-# INLINE workLoopLIFO #-}
workLoopLIFO
    :: MonadRunInIO m
    => IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    run :: m ()
run = do
        QResult (RunInIO m, StreamK m a)
work <- IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
        case QResult (RunInIO m, StreamK m a)
work of
            QResult (RunInIO m, StreamK m a)
QEmpty ->
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            QInner (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
                (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
True
            QOuter (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
                (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
False

    process :: (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
runin StreamK m a
m Bool
inner = do
        -- XXX when we finish we need to send the monadic state back to
        -- the parent so that the state can be merged back. We capture
        -- and return the state in the stop continuation.
        --
        -- Instead of using the run function we can just restore the
        -- monad state here. That way it can work easily for
        -- distributed case as well.
        StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
                State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                    State StreamK m a
forall a. HasCallStack => a
undefined
                    a -> StreamK m a -> m WorkerStatus
yieldk
                    a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
                    (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue)
                    StreamK m a
m
        WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
        case WorkerStatus
res of
            WorkerStatus
Continue -> m ()
run
            WorkerStatus
Suspend -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

        where

        single :: a -> m WorkerStatus
single a
a = do
            Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

        yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
            Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            if Bool
res
            then State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
r
            else do
                RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
                WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

-- We duplicate workLoop for yield limit and no limit cases because it has
-- around 40% performance overhead in the worst case.
--
-- XXX we can pass yinfo directly as an argument here so that we do not have to
-- make a check every time.
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
    :: forall m a. MonadRunInIO m
    => IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    incrContinue :: m WorkerStatus
incrContinue =
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue

    run :: m ()
run = do
        QResult (RunInIO m, StreamK m a)
work <- IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
        case QResult (RunInIO m, StreamK m a)
work of
            QResult (RunInIO m, StreamK m a)
QEmpty ->
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            QInner (RunInIO m, StreamK m a)
item ->
                (RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
True
            QOuter (RunInIO m, StreamK m a)
item ->
                (RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
False

    process :: (RunInIO m, StreamK m a) -> Bool -> m ()
process item :: (RunInIO m, StreamK m a)
item@(RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) Bool
inner = do
        -- XXX This is just a best effort minimization of concurrency
        -- to the yield limit. If the stream is made of concurrent
        -- streams we do not reserve the yield limit in the constituent
        -- streams before executing the action. This can be done
        -- though, by sharing the yield limit ref with downstream
        -- actions via state passing. Just a todo.
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
yieldLimitOk
        then do
            StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
                    State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                        State StreamK m a
forall a. HasCallStack => a
undefined
                        a -> StreamK m a -> m WorkerStatus
yieldk
                        a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
                        m WorkerStatus
incrContinue
                        StreamK m a
m
            WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
            case WorkerStatus
res of
                WorkerStatus
Continue -> m ()
run
                WorkerStatus
Suspend -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
        -- Avoid any side effects, undo the yield limit decrement if we
        -- never yielded anything.
        else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m, StreamK m a)
item
            Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

        where

        single :: a -> m WorkerStatus
single a
a = do
            Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

        -- XXX can we pass on the yield limit downstream to limit the
        -- concurrency of constituent streams.
        yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
            Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
            Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
            then State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
r
            else do
                RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
                WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

-------------------------------------------------------------------------------
-- Ahead Channel Data Structures
-------------------------------------------------------------------------------

-- XXX Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Therefore the queue never has more than one item in it. However, in case of
-- parIterateConcatMap the iteration may add more items at the end of the
-- queue.
--
-- XXX we can fix this. When we queue more than one item on the queue we can
-- mark the previously queued item as not-runnable. The not-runnable item is
-- not dequeued until the already running one has finished and at that time we
-- would also know the exact sequence number of the already queued item.
--
-- we can even run the already queued items but they will have to be sorted in
-- layers in the heap. We can use a list of heaps for that.
{-# ANN enqueueAhead "HLint: ignore" #-}
{-# INLINE enqueueAhead #-}
enqueueAhead ::
       Channel m a
    -> IORef ([K.StreamK m a], Int)
    -> (RunInIO m, K.StreamK m a)
    -> IO ()
enqueueAhead :: forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
q (RunInIO m, StreamK m a)
m = do
    -- XXX The queue is LIFO. When parConcatIterate queues more than one items
    -- to the queue it will perform a DFS style traversal. For BFS we will have
    -- to use a FIFO data structure here. That would require another Config
    -- option.
    IORef ([StreamK m a], Int)
-> (([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([StreamK m a], Int)
q ((([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ())
-> (([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \([StreamK m a]
xs, Int
n) -> ((RunInIO m, StreamK m a) -> StreamK m a
forall a b. (a, b) -> b
snd (RunInIO m, StreamK m a)
mStreamK m a -> [StreamK m a] -> [StreamK m a]
forall a. a -> [a] -> [a]
:[StreamK m a]
xs, Int
n)
    IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

-- Normally the thread that has the token should never go away. The token gets
-- handed over to another thread, but someone or the other has the token at any
-- point of time. But if the task that has the token finds that the outputQueue
-- is full, in that case it can go away without even handing over the token to
-- another thread. In that case it sets the nextSequence number in the heap its
-- own sequence number before going away. To handle this case, any task that
-- does not have the token tries to dequeue from the heap first before
-- dequeuing from the work queue. If it finds that the task at the top of the
-- heap is the one that owns the current sequence number then it grabs the
-- token and starts with that.
--
-- XXX instead of queueing just the head element and the remaining computation
-- on the heap, evaluate as many as we can and place them on the heap. But we
-- need to give higher priority to the lower sequence numbers so that lower
-- priority tasks do not fill up the heap making higher priority tasks block
-- due to full heap. Maybe we can have a weighted space for them in the heap.
-- The weight is inversely proportional to the sequence number.
--
-- XXX review for livelock

{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
    => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$
    IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
 -> IO (Maybe (t m a, Int)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$ \case
            ([], Int
n) -> (([], Int
n), Maybe (t m a, Int)
forall a. Maybe a
Nothing)
            (t m a
x : [t m a]
xs, Int
n) -> (([t m a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), (t m a, Int) -> Maybe (t m a, Int)
forall a. a -> Maybe a
Just (t m a
x, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))

-- Dequeue only if the seq number matches the expected seq number.
{-# INLINE dequeueAheadSeqCheck #-}
dequeueAheadSeqCheck :: MonadIO m
    => IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([t m a], Int)
q Int
seqNo = IO (Maybe (t m a)) -> m (Maybe (t m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a)) -> m (Maybe (t m a)))
-> IO (Maybe (t m a)) -> m (Maybe (t m a))
forall a b. (a -> b) -> a -> b
$
    IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a)))
-> IO (Maybe (t m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a)))
 -> IO (Maybe (t m a)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a)))
-> IO (Maybe (t m a))
forall a b. (a -> b) -> a -> b
$ \case
            ([], Int
n) -> (([], Int
n), Maybe (t m a)
forall a. Maybe a
Nothing)
            (t m a
x : [t m a]
xs, Int
n) ->
                if Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
seqNo
                then (([t m a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), t m a -> Maybe (t m a)
forall a. a -> Maybe a
Just t m a
x)
                else ((t m a
x t m a -> [t m a] -> [t m a]
forall a. a -> [a] -> [a]
: [t m a]
xs, Int
n), Maybe (t m a)
forall a. Maybe a
Nothing)

-------------------------------------------------------------------------------
-- Heap manipulation
-------------------------------------------------------------------------------

withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f

atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
    IORef a -> (a -> (a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())

data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
      AheadEntryNull
    | AheadEntryPure a
    | AheadEntryStream (RunInIO m, t m a)

data HeapDequeueResult t m a =
      Clearing
    | Waiting Int
    | Ready (Entry Int (AheadHeapEntry t m a))

{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
      HeapDequeueResult t m a))
 -> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
            Just Int
n -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
     (Entry Int (AheadHeapEntry t m a),
      Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                            if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
                            then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                            else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)

{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
      HeapDequeueResult t m a))
 -> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
     (Entry Int (AheadHeapEntry t m a),
      Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                        if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i
                        then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                        else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
            Just Int
_ -> [Char]
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"

heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
    case Maybe Int
snum of
        Maybe Int
Nothing -> Bool
True
        Just Int
n -> Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n

{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Entry Int (AheadHeapEntry t m a)
    -> Int
    -> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
 -> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Entry Int (AheadHeapEntry t m a)
-> Heap (Entry Int (AheadHeapEntry t m a))
-> Heap (Entry Int (AheadHeapEntry t m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)

{-# INLINE updateHeapSeq #-}
updateHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
 -> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)

------------------------------------------------------------------------------
-- Ahead: Concurrent streams with ordered results
------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than on item in it.
--
-- XXX Also note that limiting concurrency for cases like "take 10" would not
-- work well with left associative expressions, because we have no visibility
-- about how much the left side of the expression would yield.
--
-- XXX It may be a good idea to increment sequence numbers for each yield,
-- currently a stream on the left side of the expression may yield many
-- elements with the same sequene number. We can then use the seq number to
-- enforce yieldMax and yieldLImit as well.

-- Invariants:
--
-- * A worker should always ensure that it pushes all the consecutive items in
-- the heap to the outputQueue especially the items on behalf of the workers
-- that have already left when we were holding the token. This avoids deadlock
-- conditions when the later workers completion depends on the consumption of
-- earlier results. For more details see comments in the consumer pull side
-- code.

{-# INLINE underMaxHeap #-}
underMaxHeap ::
       Channel m a
    -> Heap (Entry Int (AheadHeapEntry K.StreamK m a))
    -> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
    ([ChildEvent a]
_, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)

    -- XXX simplify this
    let maxHeap :: Limit
maxHeap = case Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv of
            Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$
                Word -> Word -> Word
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim Word -> Word -> Word
forall a. Num a => a -> a -> a
- Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
            Limit
Unlimited -> Limit
Unlimited

    case Limit
maxHeap of
        Limited Word
lim -> do
            Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
        Limit
Unlimited -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- Return value:
-- True => stop
-- False => continue
preStopCheck ::
       Channel m a
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int)
    -> IO Bool
preStopCheck :: forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
    -- check the stop condition under a lock before actually
    -- stopping so that the whole herd does not stop at once.
    IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> IO Bool)
-> IO Bool
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
  -> IO Bool)
 -> IO Bool)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> IO Bool)
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
        Bool
heapOk <- Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
        MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv)
        let stopping :: IO Bool
stopping = do
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
                Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            continue :: IO Bool
continue = do
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
                Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        if Bool
heapOk
        then
            case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
                Maybe YieldRateInfo
Nothing -> IO Bool
continue
                Just YieldRateInfo
yinfo -> do
                    Bool
rateOk <-
                        Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate
                            (Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv) (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) YieldRateInfo
yinfo
                    if Bool
rateOk then IO Bool
continue else IO Bool
stopping
        else IO Bool
stopping

abortExecution :: Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo = do
    Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
    Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
-- a 100,000 threads producing output and queuing it to the heap for
-- sequencing. The heap can be drained only by one thread at a time, any thread
-- that finds that heap can be drained now, takes a lock and starts draining
-- it, however the thread may get prempted in the middle of it holding the
-- lock. Since that thread is holding the lock, the other threads cannot pick
-- up the draining task, therefore they proceed to picking up the next task to
-- execute. If the draining thread could yield voluntarily at a point where it
-- has released the lock, then the next threads could pick up the draining
-- instead of executing more tasks. When there are 100,000 threads the drainer
-- gets a cpu share to run only 1:100000 of the time. This makes the heap
-- accumulate a lot of output when we the buffer size is large.
--
-- The solutions to this problem are:
-- 1) make the other threads wait in a queue until the draining finishes
-- 2) make the other threads queue and go away if draining is in progress
--
-- In both cases we give the drainer a chance to run more often.
--
processHeap
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> AheadHeapEntry K.StreamK m a
    -> Int
    -> Bool -- we are draining the heap before we stop
    -> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry

    where

    stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r = do
        Bool
stopIt <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        if Bool
stopIt
        then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            -- put the entry back in the heap and stop
            IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
            Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
        else Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r

    loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
        case AheadHeapEntry StreamK m a
ent of
            AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
            AheadEntryPure a
a -> do
                -- Use 'send' directly so that we do not account this in worker
                -- latency as this will not be the real latency.
                -- Don't stop the worker in this case as we are just
                -- transferring available results from heap to outputQueue.
                m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
                    (m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                    (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
                        (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv) (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
                Int -> m ()
nextHeap Int
seqNo
            AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
r) -> do
                if Bool
stopping
                then AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r
                else do
                    StM m ()
res <- IO (StM m ()) -> m (StM m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m ()) -> m (StM m ())) -> IO (StM m ()) -> m (StM m ())
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall b. m b -> IO (StM m b)
runin (Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r)
                    StM m () -> m ()
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res

    nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
        HeapDequeueResult StreamK m a
res <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
 -> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        case HeapDequeueResult StreamK m a
res of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
            HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ ->
                if Bool
stopping
                then do
                    Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
                    if Bool
r
                    then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                    else Int -> m ()
processWorkQueue Int
prevSeqNo
                else (Int -> m ()) -> Int -> m ()
forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo

    processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
yieldLimitOk
        then do
            Maybe (StreamK m a, Int)
work <- IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
            case Maybe (StreamK m a, Int)
work of
                Maybe (StreamK m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                Just (StreamK m a
m, Int
seqNo) -> do
                    if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                    then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
                    else IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
        else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo

    -- We do not stop the worker on buffer full here as we want to proceed to
    -- nextHeap anyway so that we can clear any subsequent entries. We stop
    -- only in yield continuation where we may have a remaining stream to be
    -- pushed on the heap.
    singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
        m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        Int -> m ()
nextHeap Int
seqNo

    -- XXX when we have an unfinished stream on the heap we cannot account all
    -- the yields of that stream until it finishes, so if we have picked up
    -- and executed more actions beyond that in the parent stream and put them
    -- on the heap then they would eat up some yield limit which is not
    -- correct, we will think that our yield limit is over even though we have
    -- to yield items from unfinished stream before them. For this reason, if
    -- there are pending items in the heap we drain them unconditionally
    -- without considering the yield limit.
    runStreamWithYieldLimit :: Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r = do
        Bool
_ <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
continue -- see comment above -- && yieldLimitOk
        then do
            let stopk :: m ()
stopk = do
                  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
                  Int -> m ()
nextHeap Int
seqNo
            State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
                          (Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo)
                          (Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
                          m ()
stopk
                          StreamK m a
r
        else do
            RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, StreamK m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
                Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

    yieldStreamFromHeap :: Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo a
a StreamK m a
r = do
        Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r

{-# NOINLINE drainHeap #-}
drainHeap
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
    HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
 -> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
    case HeapDequeueResult StreamK m a
r of
        Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
            IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
        HeapDequeueResult StreamK m a
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

data HeapStatus = HContinue | HStop

processWithoutToken
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> K.StreamK m a
    -> Int
    -> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo = do
    -- we have already decremented the yield limit for m
    let stopk :: m WorkerStatus
stopk = do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
            -- If the stream stops without yielding anything, and we do not put
            -- anything on heap, but if heap was waiting for this seq number
            -- then it will keep waiting forever, because we are never going to
            -- put it on heap. So we have to put a null entry on heap even when
            -- we stop.
            AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
        mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> m b -> IO (StM m b))
-> RunInIO m -> m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv

    StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall {b}. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
            State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
                (\a
a StreamK m a
r -> do
                    RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                    AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> AheadHeapEntry StreamK m a -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ (RunInIO m, StreamK m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a StreamK m a
r))
                (AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> (a -> AheadHeapEntry StreamK m a) -> a -> m WorkerStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
                m WorkerStatus
stopk
                StreamK m a
m
    WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
    case WorkerStatus
res of
        WorkerStatus
Continue -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
        WorkerStatus
Suspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo

    where

    -- XXX to reduce contention each CPU can have its own heap
    toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
        -- Heap insertion is an expensive affair so we use a non CAS based
        -- modification, otherwise contention and retries can make a thread
        -- context switch and throw it behind other threads which come later in
        -- sequence.
        Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
 -> m (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
        Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
      Heap (Entry Int (AheadHeapEntry StreamK m a))))
 -> IO (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
        Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
            let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = Entry Int (AheadHeapEntry StreamK m a)
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
            in Bool
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
    Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
    Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxHp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)

        Bool
heapOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
        HeapStatus
status <-
            case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
                Maybe YieldRateInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                Just YieldRateInfo
yinfo ->
                    case Maybe WorkerInfo
winfo of
                        Just WorkerInfo
info -> do
                            Bool
rateOk <-
                                IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                                    (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl
                                        (Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
                                        (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
                                        YieldRateInfo
yinfo
                                        WorkerInfo
info
                            if Bool
rateOk
                            then HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                            else HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
                        Maybe WorkerInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue

        if Bool
heapOk
        then
            case HeapStatus
status of
                HeapStatus
HContinue -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
                HeapStatus
HStop -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
        else WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

data TokenWorkerStatus = TokenContinue Int | TokenSuspend

processWithToken
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> K.StreamK m a
    -> Int
    -> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
action Int
sno = do
    -- Note, we enter this function with yield limit already decremented
    -- XXX deduplicate stop in all invocations
    let stopk :: m TokenWorkerStatus
stopk = do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
            TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> m b -> IO (StM m b))
-> RunInIO m -> m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv

    StM m TokenWorkerStatus
r <-
        IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
            (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall {b}. m b -> IO (StM m b)
mrun
            (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                State StreamK m a
forall a. HasCallStack => a
undefined (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
sno) (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stopk StreamK m a
action

    TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
    case TokenWorkerStatus
res of
        TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
        TokenWorkerStatus
TokenSuspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo

    where

    singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
        Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        if Bool
continue
        then TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        else do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    -- XXX use a wrapper function around stop so that we never miss
    -- incrementing the yield in a stop continuation. Essentiatlly all
    -- "unstream" calls in this function must increment yield limit on stop.
    yieldOutput :: Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a StreamK m a
r = do
        Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then do
            let stopk :: m TokenWorkerStatus
stopk = do
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
                    TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
                          (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                          (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                          m TokenWorkerStatus
stopk
                          StreamK m a
r
        else do
            RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, StreamK m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
        let preExit :: m ()
preExit = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
                Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        -- To avoid a race when another thread puts something
        -- on the heap and goes away, the consumer will not get
        -- a doorBell and we will not clear the heap before
        -- executing the next action. If the consumer depends
        -- on the output that is stuck in the heap then this
        -- will result in a deadlock. So we always clear the
        -- heap before executing the next action.
        if Bool
yieldLimitOk
        then do
            -- XXX Instead of checking seqno inside dequeue we can dequeue
            -- unconditionally and if the seqNo is not the same as nextSeqNo
            -- then release the token and call processWithoutToken. Need
            -- to check the performance though.
            Maybe (StreamK m a)
work <- IORef ([StreamK m a], Int) -> Int -> m (Maybe (StreamK m a))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([StreamK m a], Int)
q Int
nextSeqNo
            case Maybe (StreamK m a)
work of
                Maybe (StreamK m a)
Nothing -> m ()
preExit m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
                Just StreamK m a
m -> do
                    let stopk :: m TokenWorkerStatus
stopk = do
                            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
                            TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
nextSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                        mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> m b -> IO (StM m b))
-> RunInIO m -> m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
                    StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall {b}. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
                        State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
                                      (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
nextSeqNo)
                                      (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
nextSeqNo)
                                      m TokenWorkerStatus
stopk
                                      StreamK m a
m
                    TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
                    case TokenWorkerStatus
res of
                        TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
                        TokenWorkerStatus
TokenSuspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
        else m ()
preExit m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo

-- XXX the yield limit changes increased the performance overhead by 30-40%.
-- Just like AsyncT we can use an implementation without yeidlimit and even
-- without pacing code to keep the performance higher in the unlimited and
-- unpaced case.
--
-- XXX The yieldLimit stuff is pretty invasive. We can instead do it by using
-- three hooks, a pre-execute hook, a yield hook and a stop hook. In fact these
-- hooks can be used for a more general implementation to even check predicates
-- and not just yield limit.

workLoopAhead
    :: MonadRunInIO m
    => IORef ([K.StreamK m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
        HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
 -> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        case HeapDequeueResult StreamK m a
r of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
                IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
            HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ -> do
                -- Before we execute the next item from the work queue we check
                -- if we are beyond the yield limit. It is better to check the
                -- yield limit before we pick up the next item. Otherwise we
                -- may have already started more tasks even though we may have
                -- reached the yield limit.  We can avoid this by taking active
                -- workers into account, but that is not as reliable, because
                -- workers may go away without picking up work and yielding a
                -- value.
                --
                -- Rate control can be done either based on actual yields in
                -- the output queue or based on any yield either to the heap or
                -- to the output queue. In both cases we may have one issue or
                -- the other. We chose to do this based on actual yields to the
                -- output queue because it makes the code common to both async
                -- and ahead streams.
                --
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                if Bool
yieldLimitOk
                then do
                    Maybe (StreamK m a, Int)
work <- IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
                    case Maybe (StreamK m a, Int)
work of
                        Maybe (StreamK m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                        Just (StreamK m a
m, Int
seqNo) -> do
                            if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                            then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
                            else IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
                else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo

-------------------------------------------------------------------------------
-- SVar creation
-- This code belongs in SVar.hs but is kept here for perf reasons
-------------------------------------------------------------------------------

-- XXX we have this function in this file because passing runStreamLIFO as a
-- function argument to this function results in a perf degradation of more
-- than 10%.  Need to investigate what the root cause is.
-- Interestingly, the same thing does not make any difference for Ahead.
-- {-# INLINABLE getLifoSVar #-}
getLifoSVar :: forall m a. MonadRunInIO m =>
    RunInIO m -> Config -> IO (Channel m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    -- the second component of the tuple is "Nothing" when heap is being
    -- cleared, "Just n" when we are expecting sequence number n to arrive
    -- before we can start clearing the heap.
    IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH    <- (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO
     (IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int))
forall a. a -> IO (IORef a)
newIORef (Heap (Entry Int (AheadHeapEntry StreamK m a))
forall a. Heap a
H.empty, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
Set.empty
    IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q       <- ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> IO
     (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
forall a. a -> IO (IORef a)
newIORef
                ( [] :: [(RunInIO m, K.StreamK m a)]
                , [] :: [(RunInIO m, K.StreamK m a)]
                )
    -- Sequence number is incremented whenever something is de-queued,
    -- therefore, first sequence number would be 0
    IORef ([StreamK m a], Int)
aheadQ <- ([StreamK m a], Int) -> IO (IORef ([StreamK m a], Int))
forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
    MVar ()
stopMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
    Maybe (IORef Count)
yl <-
        case Config -> Maybe Count
getYieldLimit Config
cfg of
            Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
            Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    -- We are reading it without lock, the result would be reliable only if no
    -- worker is pending.
    let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = do
            ([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) <- IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> IO ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
forall a. IORef a -> IO a
readIORef IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return ([(RunInIO m, StreamK m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
xs Bool -> Bool -> Bool
&& [(RunInIO m, StreamK m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
ys)

    let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
            Bool
yieldsDone <-
                    case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
                        Just IORef Count
ref -> do
                            Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
                            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
                        Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Bool
qEmpty <- Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished Channel m a
sv
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone

    let eagerEval :: Bool
eagerEval = Config -> Bool
getEagerDispatch Config
cfg
        inOrder :: Bool
inOrder = Config -> Bool
getOrdered Config
cfg

    let getSVar :: Channel m a
            -> (Channel m a -> m [ChildEvent a])
            -> (Channel m a -> m Bool)
            -> (Channel m a -> IO Bool)
            -> (IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
                -> Channel m a
                -> Maybe WorkerInfo
                -> m())
            -> Channel m a
        getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel :: forall (m :: * -> *) a.
RunInIO m
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> Limit
-> Limit
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> (Bool -> (RunInIO m, StreamK m a) -> IO ())
-> m ()
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> Maybe (IORef ())
-> SVarStats
-> Bool
-> ThreadId
-> Channel m a
Channel
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = Config -> Limit
getMaxBuffer Config
cfg
            , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
            , postProcess :: m Bool
postProcess      = Channel m a -> m Bool
postProc Channel m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running

            , workLoop :: Maybe WorkerInfo -> m ()
workLoop =
                if Bool
inOrder
                then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH Channel m a
sv
                else IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Channel m a
sv
            , enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue =
                \Bool
inner ->
                    if Bool
inOrder
                    then Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
                    else Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner
            , eagerDispatch :: m ()
eagerDispatch = Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
eagerEval (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
            , isWorkDone :: IO Bool
isWorkDone =
                if Bool
inOrder
                then Channel m a
-> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH
                else Channel m a -> IO Bool
workDone Channel m a
sv
            , isQueueDone :: IO Bool
isQueueDone =
                if Bool
inOrder
                then Channel m a -> IORef ([StreamK m a], Int) -> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
                else Channel m a -> IO Bool
workDone Channel m a
sv

            , doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ  = IORef Bool
wfw
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            -- XXX We can use delThread or modThread based on eager flag.
            , accountThread :: ThreadId -> m ()
accountThread    = IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread IORef (Set ThreadId)
running MVar ()
outQMv
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
stopMVar
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a
sv =
            case Config -> Maybe Rate
getStreamRate Config
cfg of
                Maybe Rate
Nothing ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
                                              IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
                Just Rate
_  ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
                                              IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
     in Channel m a -> IO (Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv

    where

    {-# INLINE isQueueDoneAhead #-}
    isQueueDoneAhead :: Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q = do
        Bool
queueDone <- IORef (t a, b) -> IO Bool
forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
        Bool
yieldsDone <-
                case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
                    Just IORef Count
yref -> do
                        Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
yref
                        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
                    Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        -- XXX note that yieldsDone can only be authoritative only when there
        -- are no workers running. If there are active workers they can
        -- later increment the yield count and therefore change the result.
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone

    {-# INLINE isWorkDoneAhead #-}
    isWorkDoneAhead :: Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
        Bool
heapDone <- do
                (Heap a
hp, b
_) <- IORef (Heap a, b) -> IO (Heap a, b)
forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
                Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
hp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0)
        Bool
queueDone <- Channel m a -> IORef (t a, b) -> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone

    checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
        (t a
xs, b
_) <- IORef (t a, b) -> IO (t a, b)
forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ t a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs

-- | Create a new async style concurrent stream evaluation channel. The monad
-- state used to run the stream actions is taken from the call site of
-- newAppendChannel.
{-# INLINABLE newAppendChannel #-}
{-# SPECIALIZE newAppendChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newAppendChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a)
newAppendChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newAppendChannel Config -> Config
modifier = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    IO (Channel m a) -> m (Channel m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a) -> m (Channel m a))
-> IO (Channel m a) -> m (Channel m a)
forall a b. (a -> b) -> a -> b
$ RunInIO m -> Config -> IO (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)