-- |
-- Module      : Streamly.Internal.Data.Fold.Concurrent.Channel.Type
-- Copyright   : (c) 2017, 2022 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
    ( Channel (..)
    , newChannel
    , Config
    , sendToWorker
    , checkFoldStatus
    , dumpSVar
    )
where

#include "inline.hs"

import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Stream.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Stream.Channel.Worker (sendWithDoorBell)

import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D

import Streamly.Internal.Data.Stream.Channel.Types

data Channel m a b = Channel
    {
    -- FORWARD FLOW: Flow of data from the driver to the consuming fold

    -- XXX This is inputQueue instead.

    -- Shared output queue (events, length)
    --
    -- [LOCKING] Frequent locked access. This is updated by the driver on each
    -- yield and once in a while read by the consumer fold thread.
    --
    -- XXX Use a different type than ChildEvent. We can do with a simpler type
    -- in folds.
      forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
    -- This is capped to maxBufferLimit if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit :: Limit

    -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
    -- to non-empty.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()  -- signal the consumer about output
    , forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]

    -- receive async events from the fold consumer to the driver.
    , forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer :: MVar ()
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell :: MVar ()

    -- cleanup: to track garbage collection of SVar --
    , forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())

    -- Stats --
    , forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats

    -- Diagnostics --
    , forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
    , forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
    }

{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a b -> IO String
dumpSVar :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
sv = do
    [String]
xs <- forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence forall a b. (a -> b) -> a -> b
$ forall a. a -> [a] -> [a]
intersperse (forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
        [ forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Show a => a -> String
dumpCreator (forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
        , forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
        , forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
        -- XXX print the types of events in the outputQueue, first 5
        , forall a. Show a => MVar a -> IO String
dumpDoorBell (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv)
        , forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
        , Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) forall a. Maybe a
Nothing (forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
        ]
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs

-------------------------------------------------------------------------------
-- Support for running folds concurrently
-------------------------------------------------------------------------------

-- $concurrentFolds
--
-- To run folds concurrently, we need to decouple the fold execution from the
-- stream production. We use the SVar to do that, we have a single worker
-- pushing the stream elements to the SVar and on the consumer side a fold
-- driver pulls the values and folds them.
--
-- @
--
-- Fold worker <------Channel<------Fold driver
--     |  exceptions  |
--     --------------->
--
-- @
--
-- We need a channel for pushing exceptions from the fold worker to the fold
-- driver. The stream may be pushed to multiple folds at the same time. For
-- that we need one Channel per fold:
--
-- @
--
-- Fold worker <------Channel--
--                    |        |
-- Fold worker <------Channel------Driver
--                    |        |
-- Fold worker <------Channel--
--
-- @
--
-- Note: If the stream pusher terminates due to an exception, we do not
-- actively terminate the fold. It gets cleaned up by the GC.

-------------------------------------------------------------------------------
-- Process events received by a fold worker from a fold driver
-------------------------------------------------------------------------------

sendToDriver :: Channel m a b -> ChildEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv ChildEvent b
msg = do
    -- In case the producer stream is blocked on pushing to the fold buffer
    -- then wake it up so that it can check for the stop event or exception
    -- being sent to it otherwise we will be deadlocked.
    -- void $ tryPutMVar (pushBufferMVar sv) ()
    forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
                     (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
sv) ChildEvent b
msg

sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (forall a. a -> ChildEvent a
ChildYield b
res)

{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))

data FromSVarState m a b =
      FromSVarRead (Channel m a b)
    | FromSVarLoop (Channel m a b) [ChildEvent a]

{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: MonadIO m => Channel m a b -> D.Stream m a
fromProducerD :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
svar = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)

    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
        [ChildEvent a]
list <- forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ Channel m a b
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop Channel m a b
sv []) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
    step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
a (forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
            ChildEvent a
ChildStopChannel -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop
            ChildEvent a
_ -> forall a. HasCallStack => a
undefined

{-# INLINE readOutputQChan #-}
readOutputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan = do
    let ss :: Maybe SVarStats
ss = if forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then forall a. a -> Maybe a
Just (forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else forall a. Maybe a
Nothing
    r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
    if Int
n forall a. Ord a => a -> a -> Bool
<= Int
0
    then do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
            forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
                (forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
                (forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan)
                String
"readOutputQChan: nothing to do"
            forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
        forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
    else forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

{-# INLINE readOutputQDB #-}
readOutputQDB :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
chan = do
    ([ChildEvent a], Int)
r <- forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan
    -- XXX We can do this only if needed, if someone sleeps because of buffer
    -- then they can set a flag and we ring the doorbell only if the flag is
    -- set. Like we do in sendWorkerWait for streams.
    Bool
_ <- forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan) ()
    forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

mkNewChannel :: forall m a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel :: forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
    IORef ([ChildEvent b], Int)
outQRev <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMvRev <- forall a. IO (MVar a)
newEmptyMVar
    MVar ()
bufferMv <- forall a. IO (MVar a)
newEmptyMVar

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let getSVar :: Channel m a b -> Channel m a b
        getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
outputQueueFromConsumer = IORef ([ChildEvent b], Int)
outQRev
            , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
            , bufferSpaceDoorBell :: MVar ()
bufferSpaceDoorBell = MVar ()
bufferMv
            , maxBufferLimit :: Limit
maxBufferLimit   = Config -> Limit
getMaxBuffer Config
cfg
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst (forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
sv)
            , svarRef :: Maybe (IORef ())
svarRef          = forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv

{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
    (Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
    (Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
    Channel m a b
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel (Config -> Config
modifier Config
defaultConfig)
    RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
    forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv

    where

    {-# NOINLINE work #-}
    work :: Channel m a b -> m ()
work Channel m a b
chan =
        let f1 :: Fold m a ()
f1 = forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
         in forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
chan

-------------------------------------------------------------------------------
-- Process events received by the driver thread from the fold worker side
-------------------------------------------------------------------------------

-- XXX currently only one event is sent by a fold consumer to the stream
-- producer. But we can potentially have multiple events e.g. the fold step can
-- generate exception more than once and the producer can ignore those
-- exceptions or handle them and still keep driving the fold.

-- XXX In case of scan this could be a stream.

-- | Poll for events sent by the fold worker to the fold driver. The fold
-- consumer can send a "Stop" event or an exception. When a "Stop" is received
-- this function returns 'True'. If an exception is recieved then it throws the
-- exception.
--
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
    ([ChildEvent b]
list, Int
_) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    forall {m :: * -> *} {a}.
MonadThrow m =>
[ChildEvent a] -> m (Maybe a)
processEvents forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [ChildEvent b]
list

    where

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> m (Maybe a)
processEvents [] = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
    processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
        case ChildEvent a
ev of
            ChildStop ThreadId
_ Maybe SomeException
e -> forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. HasCallStack => a
undefined forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM Maybe SomeException
e
            ChildEvent a
ChildStopChannel -> forall a. HasCallStack => a
undefined
            ChildYield a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
b)

{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
    let limit :: Limit
limit = forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit Channel m a b
sv
    case Limit
limit of
        Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Limited Word
lim -> do
            ([ChildEvent a]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Ord a => a -> a -> Bool
> Int
n

-- | Push values from a driver to a fold worker via a Channel. Before pushing a
-- value to the Channel it polls for events received from the fold worker.  If a
-- stop event is received then it returns 'True' otherwise false.  Propagates
-- exceptions received from the fold wroker.
--
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go

    where

    -- Recursive function, should we use SPEC?
    go :: m (Maybe b)
go = do
        let qref :: IORef ([ChildEvent b], Int)
qref = forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
chan
        Maybe b
status <- do
            ([ChildEvent b]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef ([ChildEvent b], Int)
qref
            if Int
n forall a. Ord a => a -> a -> Bool
> Int
0
            then forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
            else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        case Maybe b
status of
            Just b
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
            Maybe b
Nothing -> do
                    Bool
r <- forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
                    if Bool
r
                    then do
                        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                            forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void
                            forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
                                (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
                                (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
                                (forall a. a -> ChildEvent a
ChildYield a
a)
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                    else do
                        () <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan)
                        m (Maybe b)
go