{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Ahead
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}
--
module Streamly.Internal.Data.Stream.Ahead {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" from streamly package instead." #-}
    (
      AheadT(..)
    , Ahead
    , aheadK
    , consM
    )
where

import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (void, when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
import GHC.Exts (inline)

import qualified Data.Heap as H

import Streamly.Internal.Control.Concurrent
    (MonadRunInIO, MonadAsync, askRunInIO, restoreM)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)

import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
    (foldStreamShared, cons, mkStream, foldStream, fromEffect
    , nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
    (mapM, fromStreamK, toStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (toStreamK)

import Streamly.Internal.Data.Stream.SVar.Generate
import Streamly.Internal.Data.SVar
import Prelude hiding (map)

#include "Instances.hs"

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
    forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
            yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a (forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single (forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than on item in it.
--
-- XXX Also note that limiting concurrency for cases like "take 10" would not
-- work well with left associative expressions, because we have no visibility
-- about how much the left side of the expression would yield.
--
-- XXX It may be a good idea to increment sequence numbers for each yield,
-- currently a stream on the left side of the expression may yield many
-- elements with the same sequene number. We can then use the seq number to
-- enforce yieldMax and yieldLImit as well.

-- Invariants:
--
-- * A worker should always ensure that it pushes all the consecutive items in
-- the heap to the outputQueue especially the items on behalf of the workers
-- that have already left when we were holding the token. This avoids deadlock
-- conditions when the later workers completion depends on the consumption of
-- earlier results. For more details see comments in the consumer pull side
-- code.

{-# INLINE underMaxHeap #-}
underMaxHeap ::
       SVar Stream m a
    -> Heap (Entry Int (AheadHeapEntry Stream m a))
    -> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
    ([ChildEvent a]
_, Int
len) <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar StreamK m a
sv)

    -- XXX simplify this
    let maxHeap :: Limit
maxHeap = case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar StreamK m a
sv of
            Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$
                forall a. Ord a => a -> a -> a
max Word
0 (Word
lim forall a. Num a => a -> a -> a
- forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
            Limit
Unlimited -> Limit
Unlimited

    case Limit
maxHeap of
        Limited Word
lim -> do
            Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar StreamK m a
sv)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp forall a. Num a => a -> a -> a
+ Int
active forall a. Ord a => a -> a -> Bool
<= forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
        Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- Return value:
-- True => stop
-- False => continue
preStopCheck ::
       SVar Stream m a
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
    -> IO Bool
preStopCheck :: forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
    -- check the stop condition under a lock before actually
    -- stopping so that the whole herd does not stop at once.
    forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
        Bool
heapOk <- forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
        forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv)
        let stop :: IO Bool
stop = do
                forall a. MVar a -> a -> IO ()
putMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            continue :: IO Bool
continue = do
                forall a. MVar a -> a -> IO ()
putMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        if Bool
heapOk
        then
            case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
                Maybe YieldRateInfo
Nothing -> IO Bool
continue
                Just YieldRateInfo
yinfo -> do
                    Bool
rateOk <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar StreamK m a
sv YieldRateInfo
yinfo
                    if Bool
rateOk then IO Bool
continue else IO Bool
stop
        else IO Bool
stop

abortExecution ::
       IORef ([Stream m a], Int)
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> IO ()
abortExecution :: forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m = do
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
-- a 100,000 threads producing output and queuing it to the heap for
-- sequencing. The heap can be drained only by one thread at a time, any thread
-- that finds that heap can be drained now, takes a lock and starts draining
-- it, however the thread may get prempted in the middle of it holding the
-- lock. Since that thread is holding the lock, the other threads cannot pick
-- up the draining task, therefore they proceed to picking up the next task to
-- execute. If the draining thread could yield voluntarily at a point where it
-- has released the lock, then the next threads could pick up the draining
-- instead of executing more tasks. When there are 100,000 threads the drainer
-- gets a cpu share to run only 1:100000 of the time. This makes the heap
-- accumulate a lot of output when we the buffer size is large.
--
-- The solutions to this problem are:
-- 1) make the other threads wait in a queue until the draining finishes
-- 2) make the other threads queue and go away if draining is in progress
--
-- In both cases we give the drainer a chance to run more often.
--
processHeap
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> AheadHeapEntry Stream m a
    -> Int
    -> Bool -- we are draining the heap before we stop
    -> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry

    where

    stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r = do
        Bool
stopIt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        if Bool
stopIt
        then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            -- put the entry back in the heap and stop
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
        else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r

    loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
        case AheadHeapEntry StreamK m a
ent of
            AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
            AheadEntryPure a
a -> do
                -- Use 'send' directly so that we do not account this in worker
                -- latency as this will not be the real latency.
                -- Don't stop the worker in this case as we are just
                -- transferring available results from heap to outputQueue.
                forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
                Int -> m ()
nextHeap Int
seqNo
            AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
r) ->
                if Bool
stopping
                then AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r
                else do
                    StM m ()
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin (Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r)
                    forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res


    nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
        HeapDequeueResult StreamK m a
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1)
        case HeapDequeueResult StreamK m a
res of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
            HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ ->
                if Bool
stopping
                then do
                    Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
                    if Bool
r
                    then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
                    else Int -> m ()
processWorkQueue Int
prevSeqNo
                else forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo

    processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
        Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
        case Maybe (Stream m a, Int)
work of
            Maybe (Stream m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
            Just (Stream m a
m, Int
seqNo) -> do
                Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
                if Bool
yieldLimitOk
                then
                    if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1
                    then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                    else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m

    -- We do not stop the worker on buffer full here as we want to proceed to
    -- nextHeap anyway so that we can clear any subsequent entries. We stop
    -- only in yield continuation where we may have a remaining stream to be
    -- pushed on the heap.
    singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
        Int -> m ()
nextHeap Int
seqNo

    -- XXX when we have an unfinished stream on the heap we cannot account all
    -- the yields of that stream until it finishes, so if we have picked up
    -- and executed more actions beyond that in the parent stream and put them
    -- on the heap then they would eat up some yield limit which is not
    -- correct, we will think that our yield limit is over even though we have
    -- to yield items from unfinished stream before them. For this reason, if
    -- there are pending items in the heap we drain them unconditionally
    -- without considering the yield limit.
    runStreamWithYieldLimit :: Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r = do
        Bool
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
        if Bool
continue -- see comment above -- && yieldLimitOk
        then do
            let stop :: m ()
stop = do
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
                  Int -> m ()
nextHeap Int
seqNo
            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                          (Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo)
                          (Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
                          m ()
stop
                          Stream m a
r
        else do
            RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

    yieldStreamFromHeap :: Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo a
a Stream m a
r = do
        Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
        Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r

{-# NOINLINE drainHeap #-}
drainHeap
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
    HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
    case HeapDequeueResult StreamK m a
r of
        Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
            forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
        HeapDequeueResult StreamK m a
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo

data HeapStatus = HContinue | HStop
data WorkerStatus = Continue | Suspend

processWithoutToken
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo = do
    -- we have already decremented the yield limit for m
    let stop :: m WorkerStatus
stop = do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
            -- If the stream stops without yielding anything, and we do not put
            -- anything on heap, but if heap was waiting for this seq number
            -- then it will keep waiting forever, because we are never going to
            -- put it on heap. So we have to put a null entry on heap even when
            -- we stop.
            forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
        mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv

    StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                (\a
a Stream m a
r -> do
                    RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
                    forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a Stream m a
r))
                (forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
                m WorkerStatus
stop
                Stream m a
m
    WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
    case WorkerStatus
res of
        WorkerStatus
Continue -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
        WorkerStatus
Suspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

    where

    -- XXX to reduce contention each CPU can have its own heap
    toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
        -- Heap insertion is an expensive affair so we use a non CAS based
        -- modification, otherwise contention and retries can make a thread
        -- context switch and throw it behind other threads which come later in
        -- sequence.
        Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
            let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
            in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')

        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar StreamK m a
sv) forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                Int
maxHp <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv)
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp forall a. Ord a => a -> a -> Bool
> Int
maxHp) forall a b. (a -> b) -> a -> b
$
                    forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv) (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)

        Bool
heapOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
        HeapStatus
status <-
            case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
                Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                Just YieldRateInfo
yinfo ->
                    case Maybe WorkerInfo
winfo of
                        Just WorkerInfo
info -> do
                            Bool
rateOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar StreamK m a
sv YieldRateInfo
yinfo WorkerInfo
info
                            if Bool
rateOk
                            then forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                            else forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
                        Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue

        if Bool
heapOk
        then
            case HeapStatus
status of
                HeapStatus
HContinue -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
                HeapStatus
HStop -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
        else forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

data TokenWorkerStatus = TokenContinue Int | TokenSuspend

processWithToken
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
action Int
sno = do
    -- Note, we enter this function with yield limit already decremented
    -- XXX deduplicate stop in all invocations
    let stop :: m TokenWorkerStatus
stop = do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno forall a. Num a => a -> a -> a
+ Int
1)
        mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv

    StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
sno) (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stop Stream m a
action

    TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
    case TokenWorkerStatus
res of
        TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
        TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

    where

    singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
        Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
        if Bool
continue
        then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
        else do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
            forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    -- XXX use a wrapper function around stop so that we never miss
    -- incrementing the yield in a stop continuation. Essentiatlly all
    -- "unstream" calls in this function must increment yield limit on stop.
    yieldOutput :: Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a Stream m a
r = do
        Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
        Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
        if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then do
            let stop :: m TokenWorkerStatus
stop = do
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                          (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                          (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                          m TokenWorkerStatus
stop
                          Stream m a
r
        else do
            RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
            forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
        Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
        case Maybe (Stream m a, Int)
work of
            Maybe (Stream m a, Int)
Nothing -> do
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
                forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

            Just (Stream m a
m, Int
seqNo) -> do
                Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
                let undo :: m ()
undo = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
                        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
                        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
                if Bool
yieldLimitOk
                then
                    if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
nextSeqNo
                    then do
                        let stop :: m TokenWorkerStatus
stop = do
                                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
                                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
                            mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
                        StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
                            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
                                          (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                                          (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                                          m TokenWorkerStatus
stop
                                          Stream m a
m
                        TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
                        case TokenWorkerStatus
res of
                            TokenContinue Int
seqNo1 -> Int -> m ()
loopWithToken Int
seqNo1
                            TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

                    else
                        -- To avoid a race when another thread puts something
                        -- on the heap and goes away, the consumer will not get
                        -- a doorBell and we will not clear the heap before
                        -- executing the next action. If the consumer depends
                        -- on the output that is stuck in the heap then this
                        -- will result in a deadlock. So we always clear the
                        -- heap before executing the next action.
                        m ()
undo forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
                else m ()
undo forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo

-- XXX the yield limit changes increased the performance overhead by 30-40%.
-- Just like AsyncT we can use an implementation without yeidlimit and even
-- without pacing code to keep the performance higher in the unlimited and
-- unpaced case.
--
-- XXX The yieldLimit stuff is pretty invasive. We can instead do it by using
-- three hooks, a pre-execute hook, a yield hook and a stop hook. In fact these
-- hooks can be used for a more general implementation to even check predicates
-- and not just yield limit.

-- XXX we can remove the sv parameter as it can be derived from st

workLoopAhead
    :: MonadRunInIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
        HeapDequeueResult StreamK m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
        case HeapDequeueResult StreamK m a
r of
            Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
                forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
            HeapDequeueResult StreamK m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ -> do
                -- Before we execute the next item from the work queue we check
                -- if we are beyond the yield limit. It is better to check the
                -- yield limit before we pick up the next item. Otherwise we
                -- may have already started more tasks even though we may have
                -- reached the yield limit.  We can avoid this by taking active
                -- workers into account, but that is not as reliable, because
                -- workers may go away without picking up work and yielding a
                -- value.
                --
                -- Rate control can be done either based on actual yields in
                -- the output queue or based on any yield either to the heap or
                -- to the output queue. In both cases we may have one issue or
                -- the other. We chose to do this based on actual yields to the
                -- output queue because it makes the code common to both async
                -- and ahead streams.
                --
                Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
                case Maybe (Stream m a, Int)
work of
                    Maybe (Stream m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
                    Just (Stream m a
m, Int
seqNo) -> do
                        Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
                        if Bool
yieldLimitOk
                        then
                            if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
0
                            then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                            else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                        -- If some worker decremented the yield limit but then
                        -- did not yield anything and therefore incremented it
                        -- later, then if we did not requeue m here we may find
                        -- the work queue empty and therefore miss executing
                        -- the remaining action.
                        else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m

-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------

-- XXX To be implemented. Use a linked queue like WAsync and put back the
-- remaining computation at the back of the queue instead of the heap, and
-- increment the sequence number.

-- The only difference between forkSVarAsync and this is that we run the left
-- computation without a shared SVar.
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
        SVar StreamK m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar t m a)
newAheadVar State StreamK m a
st (forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
                          forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead
        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv)
    where
    concurrently :: StreamK m a -> StreamK m a -> StreamK m a
concurrently StreamK m a
ma StreamK m a
mb = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (forall a. (?callStack::CallStack) => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st) (RunInIO m
runInIO, StreamK m a
mb)
        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
ma

{-# INLINE aheadK #-}
aheadK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
aheadK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
    case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
        Just SVar StreamK m a
sv | forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar -> do
            RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar StreamK m a
sv (RunInIO m
runInIO, Stream m a
m2)
            -- Always run the left side on a new SVar to avoid complexity in
            -- sequencing results. This means the left side cannot further
            -- split into more ahead computations on the same SVar.
            forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
        Maybe (SVar StreamK m a)
_ -> forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2)

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using ahead.
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consM m a
m (AheadT Stream m a
r) = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r

------------------------------------------------------------------------------
-- AheadT
------------------------------------------------------------------------------

-- | For 'AheadT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.ahead'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.ahead'
-- @
--
-- A single 'Monad' bind behaves like a @for@ loop with iterations executed
-- concurrently, ahead of time, producing side effects of iterations out of
-- order, but results in order:
--
-- >>> :{
-- Stream.toList $ Stream.fromAhead $ do
--      x <- Stream.fromList [2,1] -- foreach x in stream
--      Stream.fromEffect $ delay x
-- :}
-- 1 sec
-- 2 sec
-- [2,1]
--
-- Nested monad binds behave like nested @for@ loops with nested iterations
-- executed concurrently, ahead of time:
--
-- >>> :{
-- Stream.toList $ Stream.fromAhead $ do
--     x <- Stream.fromList [1,2] -- foreach x in stream
--     y <- Stream.fromList [2,4] -- foreach y in stream
--     Stream.fromEffect $ delay (x + y)
-- :}
-- 3 sec
-- 4 sec
-- 5 sec
-- 6 sec
-- [3,5,4,6]
--
-- The behavior can be explained as follows. All the iterations corresponding
-- to the element @1@ in the first stream constitute one output stream and all
-- the iterations corresponding to @2@ constitute another output stream and
-- these two output streams are merged using 'ahead'.
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
newtype AheadT m a = AheadT {forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT :: Stream m a}

instance MonadTrans AheadT where
    {-# INLINE lift #-}
    lift :: forall (m :: * -> *) a. Monad m => m a -> AheadT m a
lift = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect

-- | A serial IO stream of elements of type @a@ with concurrent lookahead.  See
-- 'AheadT' documentation for more details.
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
type Ahead = AheadT IO

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE append #-}
{-# SPECIALIZE append :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
append :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append (AheadT Stream m a
m1) (AheadT Stream m a
m2) = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2

instance MonadAsync m => Semigroup (AheadT m a) where
    <> :: AheadT m a -> AheadT m a -> AheadT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (AheadT m a) where
    mempty :: AheadT m a
mempty = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall (m :: * -> *) a. StreamK m a
K.nil
    mappend :: AheadT m a -> AheadT m a -> AheadT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

{-# INLINE apAhead #-}
apAhead :: MonadAsync m => AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead (AheadT Stream m (a -> b)
m1) (AheadT Stream m a
m2) =
    let f :: (a -> b) -> StreamK m b
f a -> b
x1 = forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (forall a (m :: * -> *). a -> StreamK m a
K.fromPure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1

instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
    {-# INLINE pure #-}
    pure :: forall a. a -> AheadT m a
pure = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> StreamK m a
K.fromPure

    {-# INLINE (<*>) #-}
    <*> :: forall a b. AheadT m (a -> b) -> AheadT m a -> AheadT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

{-# INLINE bindAhead #-}
{-# SPECIALIZE bindAhead ::
    AheadT IO a -> (a -> AheadT IO b) -> AheadT IO b #-}
bindAhead :: MonadAsync m => AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead (AheadT Stream m a
m) a -> AheadT m b
f = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m (forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadT m b
f)

instance MonadAsync m => Monad (AheadT m) where
    return :: forall a. a -> AheadT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure

    {-# INLINE (>>=) #-}
    >>= :: forall a b. AheadT m a -> (a -> AheadT m b) -> AheadT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)