{-# OPTIONS_GHC -fno-warn-deprecations #-}
#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif

#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif

-- |
-- Module      : Streamly.Internal.Data.Stream.SVar.Generate
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.SVar.Generate
    (
    -- * Write to SVar
      toSVar

    -- * Read from SVar
    -- $concurrentEval
    , fromSVar
    , fromSVarD
    -- , fromStreamVar
    )
where

#include "inline.hs"

import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)

import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
    (Stream(..), Step(..))
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
    (Stream, foldStreamShared, mkStream, foldStream)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (fromStreamK)

import Streamly.Internal.Data.SVar

#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif


------------------------------------------------------------------------------
-- Generating streams from SVar
------------------------------------------------------------------------------

-- $concurrentEval
--
-- Usually the SVar is used to concurrently evaluate multiple actions in a
-- stream using many worker threads that push the results to the SVar and a
-- single puller that pulls them from SVar generating the evaluated stream.
--
-- @
--                  input stream
--                       |
--     <-----------------|<--------worker
--     |  exceptions     |
-- output stream <------SVar<------worker
--                       |
--                       |<--------worker
--
-- @
--
-- The puller itself schedules the worker threads based on demand.
-- Exceptions are propagated from the worker threads to the puller.

-------------------------------------------------------------------------------
-- Write a stream to an SVar
-------------------------------------------------------------------------------

-- XXX this errors out for Parallel/Ahead SVars
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
toSVar :: MonadAsync m => SVar SerialT m a -> SerialT m a -> m ()
toSVar :: forall (m :: * -> *) a.
MonadAsync m =>
SVar SerialT m a -> SerialT m a -> m ()
toSVar SVar SerialT m a
sv SerialT m a
m = do
    RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar SerialT m a
sv (RunInIO m
runIn, SerialT m a
m)
    Bool
done <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar SerialT m a
sv
    -- XXX This is safe only when called from the consumer thread or when no
    -- consumer is present.  There may be a race if we are not running in the
    -- consumer thread.
    -- XXX do this only if the work queue is not empty. The work may have been
    -- carried out by existing workers.
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$
        case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar SerialT m a
sv of
            Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar SerialT m a
sv
            Just YieldRateInfo
_  -> forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar SerialT m a
sv

-------------------------------------------------------------------------------
-- Read a stream from an SVar
-------------------------------------------------------------------------------

-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromStreamVar :: forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv = forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    [ChildEvent a]
list <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar Stream m a
sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    forall (m :: * -> *) a r.
State Stream m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> StreamK m a
processEvents forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    allDone :: m b -> m b
allDone m b
stp = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Done"
        m b
stp

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> StreamK m a
processEvents [] = forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        Bool
done <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar Stream m a
sv
        if Bool
done
        then forall {m :: * -> *} {b}. MonadIO m => m b -> m b
allDone m r
stp
        else forall (m :: * -> *) a r.
State Stream m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv

    processEvents (ChildEvent a
ev : [ChildEvent a]
es) = forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let rest :: StreamK m a
rest = [ChildEvent a] -> StreamK m a
processEvents [ChildEvent a]
es
        case ChildEvent a
ev of
            ChildYield a
a -> a -> StreamK m a -> m r
yld a
a StreamK m a
rest
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar Stream m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        Bool
stop <- forall {m :: * -> *}. MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
                        if Bool
stop
                        then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall {m :: * -> *} {b}. MonadIO m => m b -> m b
allDone m r
stp
                        else forall (m :: * -> *) a r.
State Stream m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
rest
                    Just SomeException
ex ->
                        case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                forall (m :: * -> *) a r.
State Stream m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
rest
                            Maybe ThreadAbort
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
    shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
        case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar Stream m a
sv of
            SVarStopStyle
StopNone -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            SVarStopStyle
StopAny -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            SVarStopStyle
StopBy -> do
                ThreadId
sid <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar Stream m a
sv)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ThreadId
tid forall a. Eq a => a -> a -> Bool
== ThreadId
sid

#ifdef INSPECTION
-- Use of GHC constraint tuple (GHC.Classes.(%,,%)) in fromStreamVar leads to
-- space leak because the tuple gets allocated in every recursive call and each
-- allocation holds on to the previous allocation. This test is to make sure
-- that we do not use the constraint tuple type class.
--
inspect $ hasNoTypeClassesExcept 'fromStreamVar
    [ ''Monad
    , ''Applicative
    , ''MonadThrow
    , ''Exception
    , ''MonadIO
    , ''MonadBaseControl
    , ''Typeable
    , ''Functor
    ]
#endif

-- | Generate a stream from an SVar.  An unevaluated stream can be pushed to an
-- SVar using 'toSVar'.  As we pull a stream from the SVar the input stream
-- gets evaluated concurrently. The evaluation depends on the SVar style and
-- the configuration parameters e.g. using the maxBuffer/maxThreads
-- combinators.
--
{-# INLINE fromSVar #-}
fromSVar :: MonadAsync m => SVar K.Stream m a -> SerialT m a
fromSVar :: forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
fromSVar SVar Stream m a
sv =
    forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- We pass a copy of sv to fromStreamVar, so that we know that it has
        -- no other references, when that copy gets garbage collected "ref"
        -- will get garbage collected and our hook will be called.
        forall (m :: * -> *) a r.
State Stream m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State Stream m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv{svarRef :: Maybe (IORef ())
svarRef = forall a. a -> Maybe a
Just IORef ()
ref}
    where

    hook :: IO ()
hook = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) forall a b. (a -> b) -> a -> b
$ do
            Maybe AbsTime
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv))
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) forall a b. (a -> b) -> a -> b
$
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Garbage Collected"
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv
        -- If there are any SVars referenced by this SVar a GC will prompt
        -- them to be cleaned up quickly.
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) IO ()
performMajorGC

data FromSVarState t m a =
      FromSVarInit
    | FromSVarRead (SVar t m a)
    | FromSVarLoop (SVar t m a) [ChildEvent a]
    | FromSVarDone (SVar t m a)

-- | Like 'fromSVar' but generates a StreamD style stream instead of CPS.
--
{-# INLINE_NORMAL fromSVarD #-}
fromSVarD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromSVarD :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromSVarD SVar t m a
svar = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {p}.
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
FromSVarState t m a
FromSVarInit
    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ FromSVarState t m a
FromSVarInit = do
        IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv :: SVar t m a
sv = SVar t m a
svar{svarRef :: Maybe (IORef ())
svarRef = forall a. a -> Maybe a
Just IORef ()
ref}
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv)

        where

        {-# NOINLINE hook #-}
        hook :: IO ()
hook = do
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) forall a b. (a -> b) -> a -> b
$ do
                Maybe AbsTime
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
svar))
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) forall a b. (a -> b) -> a -> b
$
                    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
svar String
"SVar Garbage Collected"
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
svar
            -- If there are any SVars referenced by this SVar a GC will prompt
            -- them to be cleaned up quickly.
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) IO ()
performMajorGC

    step p
_ (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop SVar t m a
sv []) = do
        Bool
done <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar t m a
sv
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ if Bool
done
                      then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv
                      else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv

    step p
_ (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
a (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        Bool
stop <- forall {m :: * -> *}. MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
                        if Bool
stop
                        then do
                            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv)
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                        else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                    Just SomeException
ex ->
                        case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                            Maybe ThreadAbort
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
        where

        shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
            case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar t m a
sv of
                SVarStopStyle
StopNone -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                SVarStopStyle
StopAny -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                SVarStopStyle
StopBy -> do
                    ThreadId
sid <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar t m a
sv)
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ThreadId
tid forall a. Eq a => a -> a -> Bool
== ThreadId
sid

    step p
_ (FromSVarDone SVar t m a
sv) = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop