module Streamly.Internal.Data.Fold.Concurrent.Channel
(
Channel
, Config
, maxBuffer
, inspect
, parEval
)
where
import Control.Concurrent (takeMVar)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Stream.Channel.Worker (sendWithDoorBell)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Fold.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parEval :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parEval Config -> Config
modifier Fold m a b
f =
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> a -> m (Step (Channel m a b) b)
step forall {b}. m (Step (Channel m a b) b)
initial forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> m b
extract
where
initial :: m (Step (Channel m a b) b)
initial = forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f
step :: Channel m a b -> a -> m (Step (Channel m a b) b)
step Channel m a b
chan a
a = do
Maybe b
status <- forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe b
status of
Maybe b
Nothing -> forall s b. s -> Step s b
Partial Channel m a b
chan
Just b
b -> forall s b. b -> Step s b
Done b
b
extract :: Channel m a b -> m b
extract Channel m a b
chan = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void
forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
(forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
forall a. ChildEvent a
ChildStopChannel
Maybe b
status <- forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
case Maybe b
status of
Maybe b
Nothing -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
(forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan)
String
"parEval: waiting to drain"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
chan)
Channel m a b -> m b
extract Channel m a b
chan
Just b
b -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (forall a. a -> Maybe a
Just AbsTime
t)
IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan) String
"SVar Done"
forall (m :: * -> *) a. Monad m => a -> m a
return b
b