#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif

#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif

-- |
-- Module      : Streamly.Internal.Data.Stream.SVar
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.SVar
    (
    -- * Unfold streams from SVar
    -- $concurrentEval
      fromSVar
    , fromSVarD
    -- , fromStreamVar

    -- * Fold streams to SVar
    , toSVar
    , toSVarParallel

    -- * Concurrent folds
    -- $concurrentFolds
    , fromConsumer
    , pushToFold
    , teeToSVar
    , newFoldSVar
    , newFoldSVarF
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId, takeMVar)
import Control.Exception (fromException)
import Control.Monad (when, void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)

import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

import Streamly.Internal.Data.SVar

#if __GLASGOW_HASKELL__ < 810
#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif
#endif

------------------------------------------------------------------------------
-- Generating streams from SVar
------------------------------------------------------------------------------

-- $concurrentEval
--
-- Usually the SVar is used to concurrently evaluate multiple actions in a
-- stream using many worker threads that push the results to the SVar and a
-- single puller that pulls them from SVar generating the evaluated stream.
--
-- @
--                  input stream
--                       |
--     <-----------------|<--------worker
--     |  exceptions     |
-- output stream <------SVar<------worker
--                       |
--                       |<--------worker
--
-- @
--
-- The puller itself schedules the worker threads based on demand.
-- Exceptions are propagated from the worker threads to the puller.

-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromStreamVar :: SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
    [ChildEvent a]
list <- SVar Stream m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar Stream m a
sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> Stream m a
processEvents ([ChildEvent a] -> Stream m a) -> [ChildEvent a] -> Stream m a
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    allDone :: m b -> m b
allDone m b
stp = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Done"
        m b
stp

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> Stream m a
processEvents [] = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
        Bool
done <- SVar Stream m a -> m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar Stream m a
sv
        if Bool
done
        then m r -> m r
forall (m :: * -> *) b. MonadIO m => m b -> m b
allDone m r
stp
        else State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv

    processEvents (ChildEvent a
ev : [ChildEvent a]
es) = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
        let rest :: Stream m a
rest = [ChildEvent a] -> Stream m a
processEvents [ChildEvent a]
es
        case ChildEvent a
ev of
            ChildYield a -> a -> Stream m a -> m r
yld a
a Stream m a
rest
            ChildStop tid e -> do
                SVar Stream m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar Stream m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        Bool
stop <- ThreadId -> m Bool
forall (m :: * -> *). MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
                        if Bool
stop
                        then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv) m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r -> m r
forall (m :: * -> *) b. MonadIO m => m b -> m b
allDone m r
stp
                        else State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
rest
                    Just SomeException
ex ->
                        case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
rest
                            Maybe ThreadAbort
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv) m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m r
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
    shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
        case SVar Stream m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar Stream m a
sv of
            SVarStopStyle
StopNone -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            SVarStopStyle
StopAny -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            SVarStopStyle
StopBy -> do
                ThreadId
sid <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IORef ThreadId -> IO ThreadId
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar Stream m a
sv)
                Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ ThreadId
tid ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
sid

#if __GLASGOW_HASKELL__ < 810
#ifdef INSPECTION
-- Use of GHC constraint tuple (GHC.Classes.(%,,%)) in fromStreamVar leads to
-- space leak because the tuple gets allocated in every recursive call and each
-- allocation holds on to the previous allocation. This test is to make sure
-- that we do not use the constraint tuple type class.
--
inspect $ hasNoTypeClassesExcept 'fromStreamVar
    [ ''Monad
    , ''Applicative
    , ''MonadThrow
    , ''Exception
    , ''MonadIO
    , ''MonadBaseControl
    , ''Typeable
    , ''Functor
    ]
#endif
#endif

-- | Generate a stream from an SVar.  An unevaluated stream can be pushed to an
-- SVar using 'toSVar'.  As we pull a stream from the SVar the input stream
-- gets evaluated concurrently. The evaluation depends on the SVar style and
-- the configuration parameters e.g. using the maxBuffer/maxThreads
-- combinators.
--
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, K.IsStream t) => SVar K.Stream m a -> t m a
fromSVar :: SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv =
    (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- We pass a copy of sv to fromStreamVar, so that we know that it has
        -- no other references, when that copy gets garbage collected "ref"
        -- will get garbage collected and our hook will be called.
        State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$
            Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
K.fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv{svarRef :: Maybe (IORef ())
svarRef = IORef () -> Maybe (IORef ())
forall a. a -> Maybe a
Just IORef ()
ref}
    where

    hook :: IO ()
hook = do
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Maybe AbsTime
r <- IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe AbsTime) -> IO (Maybe AbsTime))
-> IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv))
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe AbsTime -> Bool
forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                SVar Stream m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Garbage Collected"
        SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv
        -- If there are any SVars referenced by this SVar a GC will prompt
        -- them to be cleaned up quickly.
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) IO ()
performMajorGC

data FromSVarState t m a =
      FromSVarInit
    | FromSVarRead (SVar t m a)
    | FromSVarLoop (SVar t m a) [ChildEvent a]
    | FromSVarDone (SVar t m a)

-- | Like 'fromSVar' but generates a StreamD style stream instead of CPS.
--
{-# INLINE_NORMAL fromSVarD #-}
fromSVarD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromSVarD :: SVar t m a -> Stream m a
fromSVarD SVar t m a
svar = (State Stream m a
 -> FromSVarState t m a -> m (Step (FromSVarState t m a) a))
-> FromSVarState t m a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
forall p.
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
FromSVarState t m a
FromSVarInit
    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ FromSVarState t m a
FromSVarInit = do
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv :: SVar t m a
sv = SVar t m a
svar{svarRef :: Maybe (IORef ())
svarRef = IORef () -> Maybe (IORef ())
forall a. a -> Maybe a
Just IORef ()
ref}
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv)

        where

        {-# NOINLINE hook #-}
        hook :: IO ()
hook = do
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                Maybe AbsTime
r <- IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe AbsTime) -> IO (Maybe AbsTime))
-> IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
svar))
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe AbsTime -> Bool
forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
svar String
"SVar Garbage Collected"
            SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
svar
            -- If there are any SVars referenced by this SVar a GC will prompt
            -- them to be cleaned up quickly.
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) IO ()
performMajorGC

    step p
_ (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- SVar t m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop SVar t m a
sv []) = do
        Bool
done <- SVar t m a -> m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar t m a
sv
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ if Bool
done
                      then SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv
                      else SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv

    step p
_ (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
D.Yield a
a (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                SVar t m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        Bool
stop <- ThreadId -> m Bool
forall (m :: * -> *). MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
                        if Bool
stop
                        then do
                            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv)
                            Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                        else Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                    Just SomeException
ex ->
                        case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                            Maybe ThreadAbort
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv) m ()
-> m (Step (FromSVarState t m a) a)
-> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
        where

        shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
            case SVar t m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar t m a
sv of
                SVarStopStyle
StopNone -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                SVarStopStyle
StopAny -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                SVarStopStyle
StopBy -> do
                    ThreadId
sid <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IORef ThreadId -> IO ThreadId
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar t m a
sv)
                    Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ ThreadId
tid ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
sid

    step p
_ (FromSVarDone SVar t m a
sv) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
D.Stop

------------------------------------------------------------------------------
-- Writing streams to SVar
------------------------------------------------------------------------------

-- | Write a stream to an 'SVar' in a non-blocking manner. The stream is
-- evaluated concurrently as it is read back from the SVar using 'fromSVar'.
--
toSVar :: (K.IsStream t, MonadAsync m) => SVar K.Stream m a -> t m a -> m ()
toSVar :: SVar Stream m a -> t m a -> m ()
toSVar SVar Stream m a
sv t m a
m = SVar Stream m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m ()
toStreamVar SVar Stream m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
K.toStream t m a
m

-------------------------------------------------------------------------------
-- Concurrent application
-------------------------------------------------------------------------------

-- | A fold to write a stream to an SVar. Unlike 'toSVar' this does not allow
-- for concurrent evaluation of the stream, as the fold receives the input one
-- element at a time, it just forwards the elements to the SVar. However, we
-- can safely execute the fold in an independent thread, the SVar can act as a
-- buffer decoupling the sender from the receiver. Also, we can have multiple
-- folds running concurrently pusing the streams to the SVar.
--
{-# INLINE write #-}
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write :: SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
svar Maybe WorkerInfo
winfo = (() -> a -> m (Step () ()))
-> m (Step () ()) -> (() -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold () -> a -> m (Step () ())
forall (m :: * -> *) b. MonadIO m => () -> a -> m (Step () b)
step m (Step () ())
forall b. m (Step () b)
initial () -> m ()
forall (m :: * -> *). MonadIO m => () -> m ()
extract

    where

    initial :: m (Step () b)
initial = Step () b -> m (Step () b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> m (Step () b)) -> Step () b -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()

    -- XXX we can have a separate fold for unlimited buffer case to avoid a
    -- branch in the step here.
    step :: () -> a -> m (Step () b)
step () a
x =
        IO (Step () b) -> m (Step () b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step () b) -> m (Step () b))
-> IO (Step () b) -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ do
            SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
            IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
            Step () b -> IO (Step () b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> IO (Step () b)) -> Step () b -> IO (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()

    extract :: () -> m ()
extract () = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo

-- | Like write, but applies a yield limit.
--
{-# INLINE writeLimited #-}
writeLimited :: MonadIO m
    => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited :: SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
svar Maybe WorkerInfo
winfo = (Bool -> a -> m (Step Bool ()))
-> m (Step Bool ()) -> (Bool -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Bool -> a -> m (Step Bool ())
forall (m :: * -> *). MonadIO m => Bool -> a -> m (Step Bool ())
step m (Step Bool ())
forall b. m (Step Bool b)
initial Bool -> m ()
forall (m :: * -> *). MonadIO m => Bool -> m ()
extract

    where

    initial :: m (Step Bool b)
initial = Step Bool b -> m (Step Bool b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool b -> m (Step Bool b)) -> Step Bool b -> m (Step Bool b)
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool b
forall s b. s -> Step s b
FL.Partial Bool
True

    step :: Bool -> a -> m (Step Bool ())
step Bool
True a
x =
        IO (Step Bool ()) -> m (Step Bool ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step Bool ()) -> m (Step Bool ()))
-> IO (Step Bool ()) -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ do
            Bool
yieldLimitOk <- SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
svar
            if Bool
yieldLimitOk
            then do
                SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
                IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
                Step Bool () -> IO (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool ()
forall s b. s -> Step s b
FL.Partial Bool
True
            else do
                SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
svar
                SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
                Step Bool () -> IO (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()
    step Bool
False a
_ = Step Bool () -> m (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> m (Step Bool ()))
-> Step Bool () -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()

    extract :: Bool -> m ()
extract Bool
True = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
    extract Bool
False = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- Using StreamD the worker stream producing code can fuse with the code to
-- queue output to the SVar giving some perf boost.
--
-- Note that StreamD can only be used in limited situations, specifically, we
-- cannot implement joinStreamVarPar using this.
--
-- XXX make sure that the SVar passed is a Parallel style SVar.

-- | Fold the supplied stream to the SVar asynchronously using Parallel
-- concurrency style.
-- {-# INLINE_NORMAL toSVarParallel #-}
{-# INLINE toSVarParallel #-}
toSVarParallel :: MonadAsync m
    => State t m a -> SVar t m a -> D.Stream m a -> m ()
toSVarParallel :: State t m a -> SVar t m a -> Stream m a -> m ()
toSVarParallel State t m a
st SVar t m a
sv Stream m a
xs =
    if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
    then m ()
forkWithDiag
    else do
        ThreadId
tid <-
                case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
                    Maybe Count
Nothing -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
forall a. Maybe a
Nothing)
                                      (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                      (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
                    Just Count
_  -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
forall a. Maybe a
Nothing)
                                      (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                      (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
        SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid

    where

    {-# NOINLINE work #-}
    work :: Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
info = Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (SVar t m a -> Maybe WorkerInfo -> Fold m a ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs

    {-# NOINLINE workLim #-}
    workLim :: Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
info = Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (SVar t m a -> Maybe WorkerInfo -> Fold m a ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs

    {-# NOINLINE forkWithDiag #-}
    forkWithDiag :: m ()
forkWithDiag = do
        -- We do not use workerCount in case of ParallelVar but still there is
        -- no harm in maintaining it correctly.
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
        SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
        -- This allocation matters when significant number of workers are being
        -- sent. We allocate it only when needed. The overhead increases by 4x.
        Maybe WorkerInfo
winfo <-
            case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
                Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
                    IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
                    AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                    IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
                    Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
                        { workerYieldMax :: Count
workerYieldMax = Count
0
                        , workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
                        , workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
                        }
        ThreadId
tid <-
            case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
                Maybe Count
Nothing -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
winfo)
                                  (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                  (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
                Just Count
_  -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
winfo)
                                  (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                  (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
        SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid

-------------------------------------------------------------------------------
-- Support for running folds concurrently
-------------------------------------------------------------------------------

-- $concurrentFolds
--
-- To run folds concurrently, we need to decouple the fold execution from the
-- stream production. We use the SVar to do that, we have a single worker
-- pushing the stream elements to the SVar and on the consumer side a fold
-- driver pulls the values and folds them.
--
-- @
--
-- Fold worker <------SVar<------input stream
--     |  exceptions  |
--     --------------->
--
-- @
--
-- We need a channel for pushing exceptions from the fold worker to the stream
-- pusher. The stream may be pushed to multiple folds at the same time. For
-- that we need one SVar per fold:
--
-- @
--
-- Fold worker <------SVar<---
--                    |       |
-- Fold worker <------SVar<------input stream
--                    |       |
-- Fold worker <------SVar<---
--
-- @
--
-- Unlike in case concurrent stream evaluation, the puller does not drive the
-- scheduling and concurrent execution of the stream. The stream is simply
-- pushed by the stream producer at its own rate. The fold worker just pulls it
-- and folds it.
--
-- Note: If the stream pusher terminates due to an exception, we do not
-- actively terminate the fold. It gets cleaned up by the GC.

-------------------------------------------------------------------------------
-- Process events received by a fold consumer from a stream producer
-------------------------------------------------------------------------------

-- | Pull a stream from an SVar to fold it. Like 'fromSVar' except that it does
-- not drive the evaluation of the stream. It just pulls whatever is available
-- on the SVar. Also, when the fold stops it sends a notification to the stream
-- pusher/producer. No exceptions are expected to be propagated from the stream
-- pusher to the fold puller.
--
{-# NOINLINE fromProducer #-}
fromProducer :: forall m a . MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromProducer :: SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
    [ChildEvent a]
list <- SVar Stream m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar Stream m a
sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> Stream m a
processEvents ([ChildEvent a] -> Stream m a) -> [ChildEvent a] -> Stream m a
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    allDone :: m r -> m r
    allDone :: m r -> m r
allDone m r
stp = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Done"
        SVar Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar Stream m a
sv
        m r
stp

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> K.Stream m a
    processEvents :: [ChildEvent a] -> Stream m a
processEvents [] = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
        State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv

    processEvents (ChildEvent a
ev : [ChildEvent a]
es) = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> Stream m a -> m r
yld a -> m r
_ m r
stp -> do
        let rest :: Stream m a
rest = [ChildEvent a] -> Stream m a
processEvents [ChildEvent a]
es
        case ChildEvent a
ev of
            ChildYield a
a -> a -> Stream m a -> m r
yld a
a Stream m a
rest
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                SVar Stream m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar Stream m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> m r -> m r
forall r. m r -> m r
allDone m r
stp
                    Just SomeException
_ -> String -> m r
forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"

-- | Create a Fold style SVar that runs a supplied fold function as the
-- consumer.  Any elements sent to the SVar are consumed by the supplied fold
-- function.
--
{-# INLINE newFoldSVar #-}
newFoldSVar :: (K.IsStream t, MonadAsync m)
    => State K.Stream m a -> (t m a -> m b) -> m (SVar K.Stream m a)
newFoldSVar :: State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
newFoldSVar State Stream m a
stt t m a -> m b
f = do
    -- Buffer size for the SVar is derived from the current state
    SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
stt)

    -- Add the producer thread-id to the SVar.
    IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Stream m a
sv

    m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> m b
f (t m a -> m b) -> t m a -> m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
K.fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv)
                  (SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv)
                  (SVar Stream m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar Stream m a
sv)
    SVar Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv

-- | Like 'fromProducer' but generates a StreamD style stream instead of
-- StreamK.
--
{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromProducerD :: SVar t m a -> Stream m a
fromProducerD SVar t m a
svar = (State Stream m a
 -> FromSVarState t m a -> m (Step (FromSVarState t m a) a))
-> FromSVarState t m a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) p (t :: (* -> *) -> * -> *) a.
MonadIO m =>
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
svar)
    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- SVar t m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop SVar t m a
sv []) = Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv
    step p
_ (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
D.Yield a
a (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                SVar t m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv
                        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                    Just SomeException
_ -> String -> m (Step (FromSVarState t m a) a)
forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"

    step p
_ (FromSVarDone SVar t m a
sv) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
D.Stop

    step p
_ FromSVarState t m a
FromSVarInit = m (Step (FromSVarState t m a) a)
forall a. HasCallStack => a
undefined

-- | Like 'newFoldSVar' except that it uses a 'Fold' instead of a fold
-- function.
--
{-# INLINE newFoldSVarF #-}
newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF :: State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF State t m a
stt Fold m a b
f = do
    -- Buffer size for the SVar is derived from the current state
    SVar t m a
sv <- SVarStopStyle -> State t m a -> m (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (State t m a -> State t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State t m a
stt)
    -- Add the producer thread-id to the SVar.
    IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
    m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (SVar t m a -> m ()
forall (t :: (* -> *) -> * -> *). SVar t m a -> m ()
work SVar t m a
sv) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv)
    SVar t m a -> m (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv

    where

    {-# NOINLINE work #-}
    work :: SVar t m a -> m ()
work SVar t m a
sv = m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a b
f (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromProducerD SVar t m a
sv

-------------------------------------------------------------------------------
-- Process events received by the producer thread from the consumer side
-------------------------------------------------------------------------------

-- XXX currently only one event is sent by a fold consumer to the stream
-- producer. But we can potentially have multiple events e.g. the fold step can
-- generate exception more than once and the producer can ignore those
-- exceptions or handle them and still keep driving the fold.
--
-- | Poll for events sent by the fold consumer to the stream pusher. The fold
-- consumer can send a "Stop" event or an exception. When a "Stop" is received
-- this function returns 'True'. If an exception is recieved then it throws the
-- exception.
--
{-# NOINLINE fromConsumer #-}
fromConsumer :: MonadAsync m => SVar K.Stream m a -> m Bool
fromConsumer :: SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv = do
    ([ChildEvent a]
list, Int
_) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv)
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    [ChildEvent a] -> m Bool
forall (m :: * -> *) a. MonadThrow m => [ChildEvent a] -> m Bool
processEvents ([ChildEvent a] -> m Bool) -> [ChildEvent a] -> m Bool
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> m Bool
processEvents [] = Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
        case ChildEvent a
ev of
            ChildStop ThreadId
_ Maybe SomeException
e -> do
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                    Just SomeException
ex -> SomeException -> m Bool
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
            ChildYield a
_ -> String -> m Bool
forall a. HasCallStack => String -> a
error String
"Bug: fromConsumer: invalid ChildYield event"

-- | Push values from a stream to a fold worker via an SVar. Before pushing a
-- value to the SVar it polls for events received from the fold consumer.  If a
-- stop event is received then it returns 'True' otherwise false.  Propagates
-- exceptions received from the fold consumer.
--
{-# INLINE pushToFold #-}
pushToFold :: MonadAsync m => SVar K.Stream m a -> a -> m Bool
pushToFold :: SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
sv a
a = do
    -- Check for exceptions before decrement so that we do not
    -- block forever if the child already exited with an exception.
    --
    -- We avoid a race between the consumer fold sending an event and we
    -- blocking on decrementBufferLimit by waking up the producer thread in
    -- sendToProducer before any event is sent by the fold to the producer
    -- stream.
    let qref :: IORef ([ChildEvent a], Int)
qref = SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv
    Bool
done <- do
        ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([ChildEvent a], Int)
qref
        if (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
        then SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv
        else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    if Bool
done
    then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    else IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
        SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
        IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

------------------------------------------------------------------------------
-- Clone and distribute a stream in parallel
------------------------------------------------------------------------------

-- XXX this could be written in StreamD style for better efficiency with fusion.
--
-- | Tap a stream and send the elements to the specified SVar in addition to
-- yielding them again. The SVar runs a fold consumer. Elements are tapped and
-- sent to the SVar until the fold finishes. Any exceptions from the fold
-- evaluation are propagated in the current thread.
--
-- @
--
-- ------input stream---------output stream----->
--                    /|\\   |
--         exceptions  |    |  input
--                     |   \\|/
--                     ----SVar
--                          |
--                         Fold
--
-- @
--
{-# INLINE teeToSVar #-}
teeToSVar :: (K.IsStream t, MonadAsync m) =>
    SVar K.Stream m a -> t m a -> t m a
teeToSVar :: SVar Stream m a -> t m a -> t m a
teeToSVar SVar Stream m a
svr t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (Bool -> t m a -> t m a
forall (t :: (* -> *) -> * -> *).
IsStream t =>
Bool -> t m a -> t m a
go Bool
False t m a
m)

    where

    go :: Bool -> t m a -> t m a
go Bool
False t m a
m0 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
_ m r
stp -> do
        let drain :: m ()
drain = do
                -- In general, a Stop event would come equipped with the result
                -- of the fold. It is not used here but it would be useful in
                -- applicative and distribute.
                Bool
done <- SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
svr
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
                           (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
                    m ()
drain

            stopFold :: m ()
stopFold = do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
                -- drain/wait until a stop event arrives from the fold.
                m ()
drain

            stop :: m r
stop       = m ()
stopFold m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
            single :: a -> m r
single a
a   = do
                Bool
done <- SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a
                a -> t m a -> m r
yld a
a (Bool -> t m a -> t m a
go Bool
done (m () -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a
K.nilM m ()
stopFold))
            yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a m Bool -> (Bool -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
done -> a -> t m a -> m r
yld a
a (Bool -> t m a -> t m a
go Bool
done t m a
r)
         in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStreamShared State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m0

    go Bool
True t m a
m0 = t m a
m0