-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
    (
      newChannel
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (newIORef, readIORef)
import Streamly.Internal.Control.Concurrent
    (MonadRunInIO, MonadAsync, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (delThread)

import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types

------------------------------------------------------------------------------
-- Creating a channel
------------------------------------------------------------------------------

data WorkerStatus = Continue | Suspend

-- XXX This is not strictly round-robin as the streams that are faster may
-- yield more elements than the ones that are slower. Also, when streams
-- suspend due to buffer getting full they get added to the queue in a random
-- order.

{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
       Channel m a
    -> LinkedQueue (RunInIO m, K.StreamK m a)
    -> (RunInIO m, K.StreamK m a)
    -> IO ()
enqueueFIFO :: forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m = do
    forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m
    IORef Bool -> MVar () -> IO ()
ringDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

{-# INLINE workLoopFIFO #-}
workLoopFIFO
    :: MonadRunInIO m
    => LinkedQueue (RunInIO m, K.StreamK m a)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopFIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    run :: m ()
run = do
        Maybe (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
        case Maybe (RunInIO m, StreamK m a)
work of
            Maybe (RunInIO m, StreamK m a)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
                StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                        forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin
                        forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                            forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
m
                WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                case WorkerStatus
res of
                    WorkerStatus
Continue -> m ()
run
                    WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    -- XXX in general we would like to yield "n" elements from a single stream
    -- before moving on to the next. Single element granularity could be too
    -- expensive in certain cases. Similarly, we can use time limit for
    -- yielding.
    yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
        Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
    :: forall m a. MonadRunInIO m
    => LinkedQueue (RunInIO m, K.StreamK m a)
    -> Channel m a
    -> Maybe WorkerInfo
    -> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    incrContinue :: m WorkerStatus
incrContinue =
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue

    run :: m ()
run = do
        Maybe (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
        case Maybe (RunInIO m, StreamK m a)
work of
            Maybe (RunInIO m, StreamK m a)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
            Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
                Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                if Bool
yieldLimitOk
                then do
                    StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                            forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin
                            forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
                                forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
m
                    WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                    case WorkerStatus
res of
                        WorkerStatus
Continue -> m ()
run
                        WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
                else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                    forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m)
                    Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
                    forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
        Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
        RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
        Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
        if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
        else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
            forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

-------------------------------------------------------------------------------
-- SVar creation
-------------------------------------------------------------------------------

-- XXX we have this function in this file because passing runStreamLIFO as a
-- function argument to this function results in a perf degradation of more
-- than 10%.  Need to investigate what the root cause is.
-- Interestingly, the same thing does not make any difference for Ahead.
getFifoSVar :: forall m a. MonadRunInIO m =>
    RunInIO m -> Config -> IO (Channel m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ    <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv  <- forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
Set.empty
    LinkedQueue (RunInIO m, StreamK m a)
q       <- forall a. IO (LinkedQueue a)
newQ
    Maybe (IORef Count)
yl      <- case Config -> Maybe Count
getYieldLimit Config
cfg of
                Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
    let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
            Bool
yieldsDone <-
                    case forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
                        Just IORef Count
ref -> do
                            Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
                            forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
                        Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Bool
qEmpty <- forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone

    let getSVar :: Channel m a
            -> (Channel m a -> m [ChildEvent a])
            -> (Channel m a -> m Bool)
            -> (Channel m a -> IO Bool)
            -> (LinkedQueue (RunInIO m, K.StreamK m a)
                -> Channel m a
                -> Maybe WorkerInfo
                -> m())
            -> Channel m a
        getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = Config -> Limit
getMaxBuffer Config
cfg
            , maxWorkerLimit :: Limit
maxWorkerLimit   = forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
            , postProcess :: m Bool
postProcess      = Channel m a -> m Bool
postProc Channel m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
            , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv
            , enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue          = \Bool
_ -> forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q
            , eagerDispatch :: m ()
eagerDispatch    = forall (m :: * -> *) a. Monad m => a -> m a
return ()
            , isWorkDone :: IO Bool
isWorkDone       = Channel m a -> IO Bool
workDone Channel m a
sv
            , isQueueDone :: IO Bool
isQueueDone      = Channel m a -> IO Bool
workDone Channel m a
sv
            , doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ  = IORef Bool
wfw
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            , accountThread :: ThreadId -> m ()
accountThread    = forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> ThreadId -> m ()
delThread IORef (Set ThreadId)
running
            , workerStopMVar :: MVar ()
workerStopMVar   = forall a. HasCallStack => a
undefined
            , svarRef :: Maybe (IORef ())
svarRef          = forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a
sv =
            case Config -> Maybe Rate
getStreamRate Config
cfg of
                Maybe Rate
Nothing ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              forall {p}. p -> IO Bool
isWorkFinished
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
                                              forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
                Just Rate
_  ->
                    case Config -> Maybe Count
getYieldLimit Config
cfg of
                        Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              forall {p}. p -> IO Bool
isWorkFinished
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
                        Just Count
_  -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
    -> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
                                              forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
                                              forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
                                              forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
     in forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv

-- XXX GHC: If instead of MonadAsync we use (MonadIO m, MonadBaseControl IO m)
-- constraint we get a 2x perf regression. Need to look into that.
--
-- | Create a new async style concurrent stream evaluation channel. The monad
-- state used to run the stream actions is taken from the call site of
-- newChannel.
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newChannel :: MonadAsync m =>
    (Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier = do
    RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)