module Streamly.Internal.Data.Fold.Concurrent.Channel
(
module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
, maxBuffer
, boundThreads
, inspect
, parEval
)
where
import Control.Concurrent (takeMVar)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Channel.Worker (sendWithDoorBell)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Fold.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Types
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parEval :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parEval Config -> Config
modifier Fold m a b
f =
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> a -> m (Step (Channel m a b) b)
step forall {b}. m (Step (Channel m a b) b)
initial forall {p} {a}. p -> a
extract forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> m b
final
where
initial :: m (Step (Channel m a b) b)
initial = forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f
step :: Channel m a b -> a -> m (Step (Channel m a b) b)
step Channel m a b
chan a
a = do
Maybe b
status <- forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe b
status of
Maybe b
Nothing -> forall s b. s -> Step s b
Partial Channel m a b
chan
Just b
b -> forall s b. b -> Step s b
Done b
b
extract :: p -> a
extract p
_ = forall a. HasCallStack => [Char] -> a
error [Char]
"Concurrent folds do not support scanning"
final :: Channel m a b -> m b
final Channel m a b
chan = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void
forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
(forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
forall a. ChildEvent a
ChildStopChannel
Maybe b
status <- forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
case Maybe b
status of
Maybe b
Nothing -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ Bool -> IO [Char] -> [Char] -> IO () -> IO ()
withDiagMVar
(forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(forall (m :: * -> *) a b. Channel m a b -> IO [Char]
dumpSVar Channel m a b
chan)
[Char]
"parEval: waiting to drain"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
chan)
Channel m a b -> m b
final Channel m a b
chan
Just b
b -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (forall a. a -> Maybe a
Just AbsTime
t)
IO [Char] -> [Char] -> IO ()
printSVar (forall (m :: * -> *) a b. Channel m a b -> IO [Char]
dumpSVar Channel m a b
chan) [Char]
"SVar Done"
forall (m :: * -> *) a. Monad m => a -> m a
return b
b