-- |
-- Module      : Streamly.Internal.Data.Fold.Concurrent.Channel
-- Copyright   : (c) 2022 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Fold.Concurrent.Channel
    (
    -- * Channel
      Channel

    -- * Configuration
    , Config
    , maxBuffer
    , inspect

    -- * Fold operations
    , parEval
    )
where

import Control.Concurrent (takeMVar)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Stream.Channel.Worker (sendWithDoorBell)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)

import Streamly.Internal.Data.Fold.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types

-------------------------------------------------------------------------------
-- Evaluating a Fold
-------------------------------------------------------------------------------

-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.

-- | Evaluate the fold asynchronously in a worker thread separate from the
-- driver thread.
--
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parEval :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parEval Config -> Config
modifier Fold m a b
f =
    forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> a -> m (Step (Channel m a b) b)
step forall {b}. m (Step (Channel m a b) b)
initial forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> m b
extract

    where

    initial :: m (Step (Channel m a b) b)
initial = forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f

    -- XXX This is not truly asynchronous. If the fold is done we only get to
    -- know when we send the next input unless the stream ends. We could
    -- potentially throw an async exception to the driver to inform it
    -- asynchronously. Alternatively, the stream should not block forever, it
    -- should keep polling the fold status. We can insert a timer tick in the
    -- input stream to do that.
    --
    -- A polled stream abstraction may be useful, it would consist of normal
    -- events and tick events, latter are guaranteed to arrive.
    step :: Channel m a b -> a -> m (Step (Channel m a b) b)
step Channel m a b
chan a
a = do
        Maybe b
status <- forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe b
status of
            Maybe b
Nothing -> forall s b. s -> Step s b
Partial Channel m a b
chan
            Just b
b -> forall s b. b -> Step s b
Done b
b

    extract :: Channel m a b -> m b
extract Channel m a b
chan = do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void
            forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
                (forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
                (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
                forall a. ChildEvent a
ChildStopChannel
        Maybe b
status <- forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
        case Maybe b
status of
            Maybe b
Nothing -> do
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                    forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
                        (forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
                        (forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan)
                        String
"parEval: waiting to drain"
                    forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
chan)
                -- XXX remove recursion
                Channel m a b -> m b
extract Channel m a b
chan
            Just b
b -> do
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                    AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                    forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (forall a. a -> Maybe a
Just AbsTime
t)
                    IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan) String
"SVar Done"
                forall (m :: * -> *) a. Monad m => a -> m a
return b
b