module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
( Channel (..)
, newChannel
, Config
, sendToWorker
, checkFoldStatus
, dumpSVar
)
where
#include "inline.hs"
import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
(MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Stream.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Stream.Channel.Worker (sendWithDoorBell)
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import Streamly.Internal.Data.Stream.Channel.Types
data Channel m a b = Channel
{
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
, forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit :: Limit
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]
, forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats
, forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
, forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
}
{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a b -> IO String
dumpSVar :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
sv = do
[String]
xs <- forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence forall a b. (a -> b) -> a -> b
$ forall a. a -> [a] -> [a]
intersperse (forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
[ forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Show a => a -> String
dumpCreator (forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
, forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
, forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
, forall a. Show a => MVar a -> IO String
dumpDoorBell (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv)
, forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
, Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) forall a. Maybe a
Nothing (forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
]
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs
sendToDriver :: Channel m a b -> ChildEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv ChildEvent b
msg = do
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
(forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
sv) ChildEvent b
msg
sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (forall a. a -> ChildEvent a
ChildYield b
res)
{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))
data FromSVarState m a b =
FromSVarRead (Channel m a b)
| FromSVarLoop (Channel m a b) [ChildEvent a]
{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: MonadIO m => Channel m a b -> D.Stream m a
fromProducerD :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
svar = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
[ChildEvent a]
list <- forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ Channel m a b
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop Channel m a b
sv []) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
a (forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
ChildEvent a
ChildStopChannel -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop
ChildEvent a
_ -> forall a. HasCallStack => a
undefined
{-# INLINE readOutputQChan #-}
readOutputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan = do
let ss :: Maybe SVarStats
ss = if forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then forall a. a -> Maybe a
Just (forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else forall a. Maybe a
Nothing
r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
if Int
n forall a. Ord a => a -> a -> Bool
<= Int
0
then do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
(forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan)
String
"readOutputQChan: nothing to do"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
else forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
{-# INLINE readOutputQDB #-}
readOutputQDB :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
chan = do
([ChildEvent a], Int)
r <- forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan
Bool
_ <- forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan) ()
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
mkNewChannel :: forall m a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel :: forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef ([ChildEvent b], Int)
outQRev <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMvRev <- forall a. IO (MVar a)
newEmptyMVar
MVar ()
bufferMv <- forall a. IO (MVar a)
newEmptyMVar
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: Channel m a b -> Channel m a b
getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
outputQueueFromConsumer = IORef ([ChildEvent b], Int)
outQRev
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
, bufferSpaceDoorBell :: MVar ()
bufferSpaceDoorBell = MVar ()
bufferMv
, maxBufferLimit :: Limit
maxBufferLimit = Config -> Limit
getMaxBuffer Config
cfg
, readOutputQ :: m [ChildEvent a]
readOutputQ = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst (forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
sv)
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
(Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
Channel m a b
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel (Config -> Config
modifier Config
defaultConfig)
RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv
where
{-# NOINLINE work #-}
work :: Channel m a b -> m ()
work Channel m a b
chan =
let f1 :: Fold m a ()
f1 = forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
in forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
chan
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
([ChildEvent b]
list, Int
_) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
forall {m :: * -> *} {a}.
MonadThrow m =>
[ChildEvent a] -> m (Maybe a)
processEvents forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [ChildEvent b]
list
where
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> m (Maybe a)
processEvents [] = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
case ChildEvent a
ev of
ChildStop ThreadId
_ Maybe SomeException
e -> forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. HasCallStack => a
undefined forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM Maybe SomeException
e
ChildEvent a
ChildStopChannel -> forall a. HasCallStack => a
undefined
ChildYield a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
b)
{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
let limit :: Limit
limit = forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit Channel m a b
sv
case Limit
limit of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
([ChildEvent a]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Ord a => a -> a -> Bool
> Int
n
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go
where
go :: m (Maybe b)
go = do
let qref :: IORef ([ChildEvent b], Int)
qref = forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
chan
Maybe b
status <- do
([ChildEvent b]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef ([ChildEvent b], Int)
qref
if Int
n forall a. Ord a => a -> a -> Bool
> Int
0
then forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
case Maybe b
status of
Just b
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
Maybe b
Nothing -> do
Bool
r <- forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
if Bool
r
then do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void
forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
(forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
(forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
else do
() <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan)
m (Maybe b)
go