module Streamly.Internal.Data.Stream.Concurrent.Channel.Type
(
Channel(..)
, yield
, stop
, stopChannel
, dumpSVar
)
where
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef)
import Data.List (intersperse)
import Data.Set (Set)
import Streamly.Internal.Control.Concurrent (RunInIO)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Stream.Channel.Worker
(sendYield, sendStop, sendWithDoorBell)
import Streamly.Internal.Data.Stream.StreamK.Type (StreamK)
import Streamly.Internal.Data.Stream.Channel.Types
data Channel m a = Channel
{
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun :: RunInIO m
, forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
, forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell :: MVar ()
, forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]
, forall (m :: * -> *) a. Channel m a -> m Bool
postProcess :: m Bool
, forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit :: Limit
, forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit :: Limit
, forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork :: Maybe (IORef Count)
, forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo :: Maybe YieldRateInfo
, forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
, forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch :: m ()
, forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone :: IO Bool
, forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone :: IO Bool
, forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ :: IORef Bool
, forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop :: Maybe WorkerInfo -> m ()
, forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads :: IORef (Set ThreadId)
, forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount :: IORef Int
, forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread :: ThreadId -> m ()
, forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar :: MVar ()
, forall (m :: * -> *) a. Channel m a -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats :: SVarStats
, forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode :: Bool
, forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator :: ThreadId
}
{-# INLINE yield #-}
yield :: Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield :: forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
x =
forall a.
Limit
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
sendYield
(forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
Maybe WorkerInfo
winfo
(forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
(forall a. a -> ChildEvent a
ChildYield a
x)
{-# INLINE stop #-}
stop :: Channel m a -> Maybe WorkerInfo -> IO ()
stop :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo =
forall a.
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
sendStop
(forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
Maybe WorkerInfo
winfo
(forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
(forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINABLE stopChannel #-}
stopChannel :: MonadIO m => Channel m a -> m ()
stopChannel :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m a
chan = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
- Int
1
forall (f :: * -> *) a. Functor f => f a -> f ()
void
forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
(forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
forall a. ChildEvent a
ChildStopChannel
{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a -> IO String
dumpSVar :: forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv = do
[String]
xs <- forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence forall a b. (a -> b) -> a -> b
$ forall a. a -> [a] -> [a]
intersperse (forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
[ forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Show a => a -> String
dumpCreator (forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator Channel m a
sv))
, forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
, forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
, forall a. Show a => MVar a -> IO String
dumpDoorBell (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
, forall a. Show a => IORef a -> IO String
dumpNeedDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
, forall a. Show a => IORef a -> IO String
dumpRunningThreads (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
, forall a. Show a => IORef a -> IO String
dumpWorkerCount (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
, forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
, Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
]
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs