{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
module Streamly.Internal.Data.Stream.Ahead {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" from streamly package instead." #-}
(
AheadT(..)
, Ahead
, aheadK
, consM
)
where
import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (void, when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
import GHC.Exts (inline)
import qualified Data.Heap as H
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, askRunInIO, restoreM)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(foldStreamShared, cons, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(mapM, fromStreamK, toStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (toStreamK)
import Streamly.Internal.Data.Stream.SVar.Generate
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a (forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single (forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m
{-# INLINE underMaxHeap #-}
underMaxHeap ::
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a))
-> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
([ChildEvent a]
_, Int
len) <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar StreamK m a
sv)
let maxHeap :: Limit
maxHeap = case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar StreamK m a
sv of
Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim forall a. Num a => a -> a -> a
- forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
Limit
Unlimited -> Limit
Unlimited
case Limit
maxHeap of
Limited Word
lim -> do
Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar StreamK m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp forall a. Num a => a -> a -> a
+ Int
active forall a. Ord a => a -> a -> Bool
<= forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
preStopCheck ::
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
-> IO Bool
preStopCheck :: forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
Bool
heapOk <- forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv)
let stop :: IO Bool
stop = do
forall a. MVar a -> a -> IO ()
putMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
continue :: IO Bool
continue = do
forall a. MVar a -> a -> IO ()
putMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
heapOk
then
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
Maybe YieldRateInfo
Nothing -> IO Bool
continue
Just YieldRateInfo
yinfo -> do
Bool
rateOk <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar StreamK m a
sv YieldRateInfo
yinfo
if Bool
rateOk then IO Bool
continue else IO Bool
stop
else IO Bool
stop
abortExecution ::
IORef ([Stream m a], Int)
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> IO ()
abortExecution :: forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m = do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
processHeap
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry
where
stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r = do
Bool
stopIt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
stopIt
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r
loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
case AheadHeapEntry StreamK m a
ent of
AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
AheadEntryPure a
a -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
r) ->
if Bool
stopping
then AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r
else do
StM m ()
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin (Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r)
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
HeapDequeueResult StreamK m a
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1)
case HeapDequeueResult StreamK m a
res of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ ->
if Bool
stopping
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
r
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
else Int -> m ()
processWorkQueue Int
prevSeqNo
else forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo
processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1
then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m
singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
runStreamWithYieldLimit :: Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r = do
Bool
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
continue
then do
let stop :: m ()
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
Int -> m ()
nextHeap Int
seqNo
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo)
(Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
m ()
stop
Stream m a
r
else do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
yieldStreamFromHeap :: Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo a
a Stream m a
r = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r
{-# NOINLINE drainHeap #-}
drainHeap
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
HeapDequeueResult StreamK m a
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
data HeapStatus = HContinue | HStop
data WorkerStatus = Continue | Suspend
processWithoutToken
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo = do
let stop :: m WorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(\a
a Stream m a
r -> do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a Stream m a
r))
(forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
m WorkerStatus
stop
Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
WorkerStatus
Suspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
where
toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar StreamK m a
sv) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Int
maxHp <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp forall a. Ord a => a -> a -> Bool
> Int
maxHp) forall a b. (a -> b) -> a -> b
$
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv) (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)
Bool
heapOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
HeapStatus
status <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
Just YieldRateInfo
yinfo ->
case Maybe WorkerInfo
winfo of
Just WorkerInfo
info -> do
Bool
rateOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar StreamK m a
sv YieldRateInfo
yinfo WorkerInfo
info
if Bool
rateOk
then forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
else forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
if Bool
heapOk
then
case HeapStatus
status of
HeapStatus
HContinue -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
HeapStatus
HStop -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
else forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
action Int
sno = do
let stop :: m TokenWorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
sno) (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stop Stream m a
action
TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
where
singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
continue
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
else do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
yieldOutput :: Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a Stream m a
r = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then do
let stop :: m TokenWorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
r
else do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
let undo :: m ()
undo = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
nextSeqNo
then do
let stop :: m TokenWorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
m
TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo1 -> Int -> m ()
loopWithToken Int
seqNo1
TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
else
m ()
undo forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
else m ()
undo forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
workLoopAhead
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ -> do
Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State StreamK m a
st (forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv)
where
concurrently :: StreamK m a -> StreamK m a -> StreamK m a
concurrently StreamK m a
ma StreamK m a
mb = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (forall a. (?callStack::CallStack) => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st) (RunInIO m
runInIO, StreamK m a
mb)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
ma
{-# INLINE aheadK #-}
aheadK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
aheadK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar -> do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar StreamK m a
sv (RunInIO m
runInIO, Stream m a
m2)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar StreamK m a)
_ -> forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2)
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consM m a
m (AheadT Stream m a
r) = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype AheadT m a = AheadT {forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT :: Stream m a}
instance MonadTrans AheadT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> AheadT m a
lift = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
type Ahead = AheadT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
append :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append (AheadT Stream m a
m1) (AheadT Stream m a
m2) = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AheadT m a) where
<> :: AheadT m a -> AheadT m a -> AheadT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append
instance MonadAsync m => Monoid (AheadT m a) where
mempty :: AheadT m a
mempty = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: AheadT m a -> AheadT m a -> AheadT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAhead #-}
apAhead :: MonadAsync m => AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead (AheadT Stream m (a -> b)
m1) (AheadT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (forall a (m :: * -> *). a -> StreamK m a
K.fromPure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
{-# INLINE pure #-}
pure :: forall a. a -> AheadT m a
pure = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. AheadT m (a -> b) -> AheadT m a -> AheadT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead
{-# INLINE bindAhead #-}
{-# SPECIALIZE bindAhead ::
AheadT IO a -> (a -> AheadT IO b) -> AheadT IO b #-}
bindAhead :: MonadAsync m => AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead (AheadT Stream m a
m) a -> AheadT m b
f = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m (forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadT m b
f)
instance MonadAsync m => Monad (AheadT m) where
return :: forall a. a -> AheadT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: forall a b. AheadT m a -> (a -> AheadT m b) -> AheadT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead
MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)