module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
(
newChannel
)
where
import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar, newMVar, putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Kind (Type)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (modifyThread)
import qualified Data.Heap as H
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Worker
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
Channel m a
-> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Bool
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueLIFO :: forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner (RunInIO m, StreamK m a)
m = do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q forall a b. (a -> b) -> a -> b
$ \([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) ->
if Bool
inner then ([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
m forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
ys) else ((RunInIO m, StreamK m a)
m forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys)
IORef Bool -> MVar () -> IO ()
ringDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
data QResult a = QEmpty | QOuter a | QInner a
{-# INLINE dequeue #-}
dequeue :: MonadIO m =>
IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> m (QResult (RunInIO m, K.StreamK m a))
dequeue :: forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
forall a b. (a -> b) -> a -> b
$ \case
([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
y : [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), forall a. a -> QResult a
QInner (RunInIO m, StreamK m a)
y)
((RunInIO m, StreamK m a)
x : [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), forall a. a -> QResult a
QOuter (RunInIO m, StreamK m a)
x)
([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x -> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x, forall a. QResult a
QEmpty)
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadRunInIO m
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
run :: m ()
run = do
QResult (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
case QResult (RunInIO m, StreamK m a)
work of
QResult (RunInIO m, StreamK m a)
QEmpty ->
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
QInner (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
(m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
True
QOuter (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
(m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
False
process :: (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
runin StreamK m a
m Bool
inner = do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
runin forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
forall a. HasCallStack => a
undefined
a -> StreamK m a -> m WorkerStatus
yieldk
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
(forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue)
StreamK m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
where
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
if Bool
res
then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
r
else do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. MonadRunInIO m
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
QResult (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
case QResult (RunInIO m, StreamK m a)
work of
QResult (RunInIO m, StreamK m a)
QEmpty ->
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
QInner (RunInIO m, StreamK m a)
item ->
(RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
True
QOuter (RunInIO m, StreamK m a)
item ->
(RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
False
process :: (RunInIO m, StreamK m a) -> Bool -> m ()
process item :: (RunInIO m, StreamK m a)
item@(RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) Bool
inner = do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
forall a. HasCallStack => a
undefined
a -> StreamK m a -> m WorkerStatus
yieldk
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
m WorkerStatus
incrContinue
StreamK m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m, StreamK m a)
item
Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
where
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
r
else do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
{-# ANN enqueueAhead "HLint: ignore" #-}
{-# INLINE enqueueAhead #-}
enqueueAhead ::
Channel m a
-> IORef ([K.StreamK m a], Int)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueAhead :: forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
q (RunInIO m, StreamK m a)
m = do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([StreamK m a], Int)
q forall a b. (a -> b) -> a -> b
$ \([StreamK m a]
xs, Int
n) -> (forall a b. (a, b) -> b
snd (RunInIO m, StreamK m a)
mforall a. a -> [a] -> [a]
:[StreamK m a]
xs, Int
n)
IORef Bool -> MVar () -> IO ()
ringDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), forall a. Maybe a
Nothing)
(t m a
x : [t m a]
xs, Int
n) -> (([t m a]
xs, Int
n forall a. Num a => a -> a -> a
+ Int
1), forall a. a -> Maybe a
Just (t m a
x, Int
n forall a. Num a => a -> a -> a
+ Int
1))
{-# INLINE dequeueAheadSeqCheck #-}
dequeueAheadSeqCheck :: MonadIO m
=> IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([t m a], Int)
q Int
seqNo = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), forall a. Maybe a
Nothing)
(t m a
x : [t m a]
xs, Int
n) ->
if Int
n forall a. Num a => a -> a -> a
+ Int
1 forall a. Eq a => a -> a -> Bool
== Int
seqNo
then (([t m a]
xs, Int
n forall a. Num a => a -> a -> a
+ Int
1), forall a. a -> Maybe a
Just t m a
x)
else ((t m a
x forall a. a -> [a] -> [a]
: [t m a]
xs, Int
n), forall a. Maybe a
Nothing)
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = forall a. IORef a -> IO a
readIORef IORef a
ref forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (RunInIO m, t m a)
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
Just Int
n -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
n
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
i
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Just Int
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
case Maybe Int
snum of
Maybe Int
Nothing -> Bool
True
Just Int
n -> Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE underMaxHeap #-}
underMaxHeap ::
Channel m a
-> Heap (Entry Int (AheadHeapEntry K.StreamK m a))
-> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
([ChildEvent a]
_, Int
len) <- forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
let maxHeap :: Limit
maxHeap = case forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv of
Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim forall a. Num a => a -> a -> a
- forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
Limit
Unlimited -> Limit
Unlimited
case Limit
maxHeap of
Limited Word
lim -> do
Int
active <- forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp forall a. Num a => a -> a -> a
+ Int
active forall a. Ord a => a -> a -> Bool
<= forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
preStopCheck ::
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int)
-> IO Bool
preStopCheck :: forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
Bool
heapOk <- forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv)
let stopping :: IO Bool
stopping = do
forall a. MVar a -> a -> IO ()
putMVar (forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
continue :: IO Bool
continue = do
forall a. MVar a -> a -> IO ()
putMVar (forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
heapOk
then
case forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> IO Bool
continue
Just YieldRateInfo
yinfo -> do
Bool
rateOk <-
Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate
(forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) YieldRateInfo
yinfo
if Bool
rateOk then IO Bool
continue else IO Bool
stopping
else IO Bool
stopping
abortExecution :: Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo = do
Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
processHeap
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry K.StreamK m a
-> Int
-> Bool
-> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry
where
stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r = do
Bool
stopIt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
stopIt
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r
loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
case AheadHeapEntry StreamK m a
ent of
AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
AheadEntryPure a
a -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv) (forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
r) -> do
if Bool
stopping
then AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r
else do
StM m ()
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin (Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r)
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
HeapDequeueResult StreamK m a
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1)
case HeapDequeueResult StreamK m a
res of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ ->
if Bool
stopping
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
r
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else Int -> m ()
processWorkQueue Int
prevSeqNo
else forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo
processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
case Maybe (StreamK m a, Int)
work of
Maybe (StreamK m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (StreamK m a
m, Int
seqNo) -> do
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1
then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo
singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Int -> m ()
nextHeap Int
seqNo
runStreamWithYieldLimit :: Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r = do
Bool
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
continue
then do
let stopk :: m ()
stopk = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
Int -> m ()
nextHeap Int
seqNo
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo)
(Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
m ()
stopk
StreamK m a
r
else do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
yieldStreamFromHeap :: Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo a
a StreamK m a
r = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r
{-# NOINLINE drainHeap #-}
drainHeap
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
HeapDequeueResult StreamK m a
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
data HeapStatus = HContinue | HStop
processWithoutToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.StreamK m a
-> Int
-> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo = do
let stopk :: m WorkerStatus
stopk = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
(\a
a StreamK m a
r -> do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a StreamK m a
r))
(forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
m WorkerStatus
stopk
StreamK m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
WorkerStatus
Suspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
where
toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
in forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Int
maxHp <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp forall a. Ord a => a -> a -> Bool
> Int
maxHp) forall a b. (a -> b) -> a -> b
$
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)
Bool
heapOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
HeapStatus
status <-
case forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
Just YieldRateInfo
yinfo ->
case Maybe WorkerInfo
winfo of
Just WorkerInfo
info -> do
Bool
rateOk <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl
(forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
YieldRateInfo
yinfo
WorkerInfo
info
if Bool
rateOk
then forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
else forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
if Bool
heapOk
then
case HeapStatus
status of
HeapStatus
HContinue -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
HeapStatus
HStop -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
else forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.StreamK m a
-> Int
-> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
action Int
sno = do
let stopk :: m TokenWorkerStatus
stopk = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m TokenWorkerStatus
r <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
forall a. HasCallStack => a
undefined (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
sno) (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stopk StreamK m a
action
TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
where
singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
if Bool
continue
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
else do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
yieldOutput :: Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a StreamK m a
r = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then do
let stopk :: m TokenWorkerStatus
stopk = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stopk
StreamK m a
r
else do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
let preExit :: m ()
preExit = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([StreamK m a], Int)
q Int
nextSeqNo
case Maybe (StreamK m a)
work of
Maybe (StreamK m a)
Nothing -> m ()
preExit forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
Just StreamK m a
m -> do
let stopk :: m TokenWorkerStatus
stopk = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
nextSeqNo forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
nextSeqNo)
(forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
nextSeqNo)
m TokenWorkerStatus
stopk
StreamK m a
m
TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
else m ()
preExit forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
workLoopAhead
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
case Maybe (StreamK m a, Int)
work of
Maybe (StreamK m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (StreamK m a
m, Int
seqNo) -> do
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo
getLifoSVar :: forall m a. MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH <- forall a. a -> IO (IORef a)
newIORef (forall a. Heap a
H.empty, forall a. a -> Maybe a
Just Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
Set.empty
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q <- forall a. a -> IO (IORef a)
newIORef
( [] :: [(RunInIO m, K.StreamK m a)]
, [] :: [(RunInIO m, K.StreamK m a)]
)
IORef ([StreamK m a], Int)
aheadQ <- forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
MVar ()
stopMVar <- forall a. a -> IO (MVar a)
newMVar ()
Maybe (IORef Count)
yl <-
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = do
([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) <- forall a. IORef a -> IO a
readIORef IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q
forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
xs Bool -> Bool -> Bool
&& forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
ys)
let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
Bool
yieldsDone <-
case forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
ref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- forall {p}. p -> IO Bool
isWorkFinished Channel m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let eagerEval :: Bool
eagerEval = Config -> Bool
getEagerDispatch Config
cfg
inOrder :: Bool
inOrder = Config -> Bool
getOrdered Config
cfg
let getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m())
-> Channel m a
getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = Config -> Limit
getMaxBuffer Config
cfg
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, readOutputQ :: m [ChildEvent a]
readOutputQ = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
, postProcess :: m Bool
postProcess = Channel m a -> m Bool
postProc Channel m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop =
if Bool
inOrder
then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH Channel m a
sv
else IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Channel m a
sv
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue =
\Bool
inner ->
if Bool
inOrder
then forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
else forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner
, eagerDispatch :: m ()
eagerDispatch = forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
eagerEval forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
, isWorkDone :: IO Bool
isWorkDone =
if Bool
inOrder
then forall {t :: * -> *} {m :: * -> *} {a} {a} {b} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH
else Channel m a -> IO Bool
workDone Channel m a
sv
, isQueueDone :: IO Bool
isQueueDone =
if Bool
inOrder
then forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
else Channel m a -> IO Bool
workDone Channel m a
sv
, doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ = IORef Bool
wfw
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread IORef (Set ThreadId)
running MVar ()
outQMv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
stopMVar
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a
sv =
case Config -> Maybe Rate
getStreamRate Config
cfg of
Maybe Rate
Nothing ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
Just Rate
_ ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
in forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead :: Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q = do
Bool
queueDone <- forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
Bool
yieldsDone <-
case forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
yref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
yref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead :: Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
Bool
heapDone <- do
(Heap a
hp, b
_) <- forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Heap a -> Int
H.size Heap a
hp forall a. Ord a => a -> a -> Bool
<= Int
0)
Bool
queueDone <- forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone
checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
(t a
xs, b
_) <- forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier = do
RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)