{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Hercules.Agent.Producer where

import Control.Applicative
import Control.Concurrent hiding (throwTo)
import Control.Concurrent.Async hiding (cancel)
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.State
import Data.Foldable
import Data.Traversable
import Katip
import UnliftIO.Exception
import Prelude

-- | A thread producing zero or more payloads and a final value.
-- Handles exception propagation.
data Producer p r = Producer
  { forall p r. Producer p r -> STM (Msg p r)
producerQueueRead :: STM (Msg p r),
    forall p r. Producer p r -> ThreadId
producerThread :: ThreadId
  }
  deriving ((forall a b. (a -> b) -> Producer p a -> Producer p b)
-> (forall a b. a -> Producer p b -> Producer p a)
-> Functor (Producer p)
forall a b. a -> Producer p b -> Producer p a
forall a b. (a -> b) -> Producer p a -> Producer p b
forall p a b. a -> Producer p b -> Producer p a
forall p a b. (a -> b) -> Producer p a -> Producer p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Producer p b -> Producer p a
$c<$ :: forall p a b. a -> Producer p b -> Producer p a
fmap :: forall a b. (a -> b) -> Producer p a -> Producer p b
$cfmap :: forall p a b. (a -> b) -> Producer p a -> Producer p b
Functor)

data ProducerCancelled = ProducerCancelled
  deriving (Int -> ProducerCancelled -> ShowS
[ProducerCancelled] -> ShowS
ProducerCancelled -> String
(Int -> ProducerCancelled -> ShowS)
-> (ProducerCancelled -> String)
-> ([ProducerCancelled] -> ShowS)
-> Show ProducerCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerCancelled] -> ShowS
$cshowList :: [ProducerCancelled] -> ShowS
show :: ProducerCancelled -> String
$cshow :: ProducerCancelled -> String
showsPrec :: Int -> ProducerCancelled -> ShowS
$cshowsPrec :: Int -> ProducerCancelled -> ShowS
Show, Show ProducerCancelled
Typeable ProducerCancelled
Typeable ProducerCancelled
-> Show ProducerCancelled
-> (ProducerCancelled -> SomeException)
-> (SomeException -> Maybe ProducerCancelled)
-> (ProducerCancelled -> String)
-> Exception ProducerCancelled
SomeException -> Maybe ProducerCancelled
ProducerCancelled -> String
ProducerCancelled -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ProducerCancelled -> String
$cdisplayException :: ProducerCancelled -> String
fromException :: SomeException -> Maybe ProducerCancelled
$cfromException :: SomeException -> Maybe ProducerCancelled
toException :: ProducerCancelled -> SomeException
$ctoException :: ProducerCancelled -> SomeException
Exception, Typeable)

data Msg p r
  = -- | One of possibly many payloads from the producer
    Payload p
  | -- | The producer stopped due to an exception
    Exception SomeException
  | -- | The producer was done and produced a final value
    Close r
  deriving ((forall a b. (a -> b) -> Msg p a -> Msg p b)
-> (forall a b. a -> Msg p b -> Msg p a) -> Functor (Msg p)
forall a b. a -> Msg p b -> Msg p a
forall a b. (a -> b) -> Msg p a -> Msg p b
forall p a b. a -> Msg p b -> Msg p a
forall p a b. (a -> b) -> Msg p a -> Msg p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Msg p b -> Msg p a
$c<$ :: forall p a b. a -> Msg p b -> Msg p a
fmap :: forall a b. (a -> b) -> Msg p a -> Msg p b
$cfmap :: forall p a b. (a -> b) -> Msg p a -> Msg p b
Functor)

-- | @forkProducer f@ produces a computation that forks a thread for @f@, which
-- receives a function for returning payloads @p@.
--
-- @f@ may produce a final result value @r@ when it is done.
forkProducer :: forall m p r. (MonadIO m, MonadUnliftIO m) => ((p -> m ()) -> m r) -> m (Producer p r)
forkProducer :: forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f = do
  TQueue (Msg p r)
q <- IO (TQueue (Msg p r)) -> m (TQueue (Msg p r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue (Msg p r))
forall a. IO (TQueue a)
newTQueueIO
  let write :: MonadIO m' => Msg p r -> m' ()
      write :: forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write = IO () -> m' ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m' ()) -> (Msg p r -> IO ()) -> Msg p r -> m' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Msg p r -> STM ()) -> Msg p r -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TQueue (Msg p r) -> Msg p r -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Msg p r)
q
  IO r
f' <- m r -> m (IO r)
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO ((p -> m ()) -> m r
f (Msg p r -> m ()
forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write (Msg p r -> m ()) -> (p -> Msg p r) -> p -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. p -> Msg p r
forall p r. p -> Msg p r
Payload))
  ThreadId
t <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IO r -> (Either SomeException r -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO r
f' (Msg p r -> IO ()
forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write (Msg p r -> IO ())
-> (Either SomeException r -> Msg p r)
-> Either SomeException r
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException r -> Msg p r
forall {r} {p}. Either SomeException r -> Msg p r
toResult)
  Producer p r -> m (Producer p r)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Producer p r -> m (Producer p r))
-> Producer p r -> m (Producer p r)
forall a b. (a -> b) -> a -> b
$ Producer :: forall p r. STM (Msg p r) -> ThreadId -> Producer p r
Producer {producerQueueRead :: STM (Msg p r)
producerQueueRead = TQueue (Msg p r) -> STM (Msg p r)
forall a. TQueue a -> STM a
readTQueue TQueue (Msg p r)
q, producerThread :: ThreadId
producerThread = ThreadId
t}
  where
    toResult :: Either SomeException r -> Msg p r
toResult (Left SomeException
e) = SomeException -> Msg p r
forall p r. SomeException -> Msg p r
Exception SomeException
e
    toResult (Right r
r) = r -> Msg p r
forall p r. r -> Msg p r
Close r
r

-- | Throws 'ProducerCancelled' as an async exception to the producer thread.
-- Blocks until exception is raised. See 'throwTo'.
cancel :: MonadIO m => Producer p r -> m ()
cancel :: forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel Producer p r
p = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> ProducerCancelled -> IO ()
forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
throwTo (Producer p r -> ThreadId
forall p r. Producer p r -> ThreadId
producerThread Producer p r
p) ProducerCancelled
ProducerCancelled

-- | Perform an computation while @withProducer@ takes care of forking and cleaning up.
--
-- @withProducer (\write -> write "a" >> write "b") $ \producer -> consume producer@
withProducer ::
  (MonadIO m, MonadUnliftIO m) =>
  ((p -> m ()) -> m r) ->
  (Producer p r -> m a) ->
  m a
withProducer :: forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer (p -> m ()) -> m r
f = m (Producer p r)
-> (Producer p r -> m ()) -> (Producer p r -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (((p -> m ()) -> m r) -> m (Producer p r)
forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f) Producer p r -> m ()
forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel

listen ::
  MonadIO m =>
  Producer p r ->
  (p -> m a) ->
  (r -> m a) ->
  STM (m a)
listen :: forall (m :: * -> *) p r a.
MonadIO m =>
Producer p r -> (p -> m a) -> (r -> m a) -> STM (m a)
listen Producer p r
p p -> m a
fPayload r -> m a
fResult =
  (Msg p r -> m a) -> STM (Msg p r) -> STM (m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Msg p r -> m a
f (Producer p r -> STM (Msg p r)
forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
p)
  where
    f :: Msg p r -> m a
f (Payload p
payload) = p -> m a
fPayload p
payload
    f (Exception SomeException
e) = SomeException -> m a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
    f (Close r
r) = r -> m a
fResult r
r

joinSTM :: MonadIO m => STM (m a) -> m a
joinSTM :: forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM = m (m a) -> m a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m a) -> m a) -> (STM (m a) -> m (m a)) -> STM (m a) -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (m a) -> m (m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (m a) -> m (m a))
-> (STM (m a) -> IO (m a)) -> STM (m a) -> m (m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (m a) -> IO (m a)
forall a. STM a -> IO a
atomically

data Syncing a = Syncable a | Syncer (Maybe SomeException -> STM ())

-- | Sends sync notifications after the whole computation succeeds (or fails)
-- Note: not exception safe in the presence of pure exceptions.
withSync ::
  (MonadIO m, MonadUnliftIO m, Traversable t) =>
  t (Syncing a) ->
  (t (Maybe a) -> m b) ->
  m b
withSync :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadIO m, MonadUnliftIO m, Traversable t) =>
t (Syncing a) -> (t (Maybe a) -> m b) -> m b
withSync t (Syncing a)
t t (Maybe a) -> m b
f = do
  let (t (Maybe a)
t', Maybe SomeException -> STM ()
syncs) =
        State (Maybe SomeException -> STM ()) (t (Maybe a))
-> (Maybe SomeException -> STM ())
-> (t (Maybe a), Maybe SomeException -> STM ())
forall s a. State s a -> s -> (a, s)
runState
          ( t (Syncing a)
-> (Syncing a
    -> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
-> State (Maybe SomeException -> STM ()) (t (Maybe a))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (Syncing a)
t ((Syncing a
  -> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
 -> State (Maybe SomeException -> STM ()) (t (Maybe a)))
-> (Syncing a
    -> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
-> State (Maybe SomeException -> STM ()) (t (Maybe a))
forall a b. (a -> b) -> a -> b
$ \case
              Syncable a
a -> Maybe a
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
              Syncer Maybe SomeException -> STM ()
s -> Maybe a
forall a. Maybe a
Nothing Maybe a
-> StateT (Maybe SomeException -> STM ()) Identity ()
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ ((Maybe SomeException -> STM ()) -> Maybe SomeException -> STM ())
-> StateT (Maybe SomeException -> STM ()) Identity ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((Maybe SomeException -> STM ())
-> (Maybe SomeException -> STM ()) -> Maybe SomeException -> STM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe SomeException -> STM ()
s)
          )
          (\Maybe SomeException
_ -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
  b
b <- t (Maybe a) -> m b
f t (Maybe a)
t' m b -> (SomeException -> m ()) -> m b
forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (SomeException -> IO ()) -> SomeException -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (SomeException -> STM ()) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe SomeException -> STM ()
syncs (Maybe SomeException -> STM ())
-> (SomeException -> Maybe SomeException)
-> SomeException
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just)
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SomeException -> STM ()
syncs Maybe SomeException
forall a. Maybe a
Nothing
  b -> m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b

--  where trav =
--  deriving (Functor)
--instance Applicative Syncing where
--  pure = Synced Nothing
--  Synced sf f <*> Synced af a = Synced (sf <> af) (f a)

-- Potential improvements:
--  - Performance: Get rid of the producer thread by doing the batching in STM.
--                 (pinging thread still required)
--  - Performance: Multiple elements as input
--  - Idle footprint: The pinger can be made to wait for the queue to be non-empty before starting the delay.
--     - Add a tryPeek function to Producer
--     - Make sure it is not woken up after the queue has become non-empty
--     - Alternatively, maybe use stm-delay (which uses GHC.Event for efficiency)
--       https://hackage.haskell.org/package/stm-delay-0.1.1.1/docs/Control-Concurrent-STM-Delay.html
withBoundedDelayBatchProducer ::
  (MonadIO m, MonadUnliftIO m, KatipContext m) =>
  -- | Max time before flushing in microseconds
  Int ->
  -- | Max number of items in batch
  Int ->
  Producer p r ->
  (Producer [p] r -> m a) ->
  m a
withBoundedDelayBatchProducer :: forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m, KatipContext m) =>
Int -> Int -> Producer p r -> (Producer [p] r -> m a) -> m a
withBoundedDelayBatchProducer Int
maxDelay Int
maxItems Producer p r
sourceP Producer [p] r -> m a
f = do
  UnliftIO {unliftIO :: forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO = forall a. m a -> IO a
unlift} <- m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
  TQueue ()
flushes <- IO (TQueue ()) -> m (TQueue ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue ())
forall a. IO (TQueue a)
newTQueueIO
  let producer :: ([p] -> f ()) -> f r
producer [p] -> f ()
writeBatch =
        let beginReading :: f r
beginReading = Int -> [p] -> f r
readItems (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
maxItems) []
            doPerformBatch :: [p] -> f ()
doPerformBatch [] = () -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            doPerformBatch [p]
buf = [p] -> f ()
writeBatch ([p] -> [p]
forall a. [a] -> [a]
reverse [p]
buf)
            readItems :: Int -> [p] -> f r
readItems Int
0 [p]
buf = do
              --logLocM DebugS "batch on full"
              [p] -> f ()
doPerformBatch [p]
buf
              f r
beginReading
            readItems Int
bufferRemaining [p]
buf =
              STM (f r) -> f r
forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM
                ( Msg p r -> f r
onQueueRead (Msg p r -> f r) -> STM (Msg p r) -> STM (f r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Producer p r -> STM (Msg p r)
forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
sourceP
                    STM (f r) -> STM (f r) -> STM (f r)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> f r
onFlush
                    f r -> STM () -> STM (f r)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TQueue () -> STM ()
forall a. TQueue a -> STM a
readTQueue TQueue ()
flushes
                )
              where
                onQueueRead :: Msg p r -> f r
onQueueRead (Payload p
a) =
                  Int -> [p] -> f r
readItems (Int
bufferRemaining Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (p
a p -> [p] -> [p]
forall a. a -> [a] -> [a]
: [p]
buf)
                onQueueRead (Close r
r) = do
                  --logLocM DebugS $ "batch on close: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  r -> f r
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
                onQueueRead (Exception SomeException
e) = do
                  --logLocM DebugS $ "batch on exception: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  IO r -> f r
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO r -> f r) -> IO r -> f r
forall a b. (a -> b) -> a -> b
$ SomeException -> IO r
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
                onFlush :: f r
onFlush = do
                  --logLocM DebugS $ "batch on flush: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  f r
beginReading
         in f r
beginReading
  IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$
    IO Any -> (Async Any -> IO a) -> IO a
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync
      ( IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
          Int -> IO ()
threadDelay Int
maxDelay
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue () -> () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ()
flushes ()
      )
      ((Async Any -> IO a) -> IO a) -> (Async Any -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Async Any
_flusher -> m a -> IO a
forall a. m a -> IO a
unlift (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ (([p] -> m ()) -> m r) -> (Producer [p] r -> m a) -> m a
forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer ([p] -> m ()) -> m r
forall {f :: * -> *}. MonadIO f => ([p] -> f ()) -> f r
producer Producer [p] r -> m a
f

syncer :: MonadIO m => (Syncing a -> m ()) -> m ()
syncer :: forall (m :: * -> *) a. MonadIO m => (Syncing a -> m ()) -> m ()
syncer Syncing a -> m ()
writer = do
  TMVar (Maybe SomeException)
v <- IO (TMVar (Maybe SomeException)) -> m (TMVar (Maybe SomeException))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe SomeException))
forall a. IO (TMVar a)
newEmptyTMVarIO
  Syncing a -> m ()
writer ((Maybe SomeException -> STM ()) -> Syncing a
forall a. (Maybe SomeException -> STM ()) -> Syncing a
Syncer ((Maybe SomeException -> STM ()) -> Syncing a)
-> (Maybe SomeException -> STM ()) -> Syncing a
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe SomeException) -> Maybe SomeException -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
v)
  Maybe SomeException
mexc <- IO (Maybe SomeException) -> m (Maybe SomeException)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe SomeException) -> m (Maybe SomeException))
-> IO (Maybe SomeException) -> m (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a. STM a -> IO a
atomically (STM (Maybe SomeException) -> IO (Maybe SomeException))
-> STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe SomeException) -> STM (Maybe SomeException)
forall a. TMVar a -> STM a
readTMVar TMVar (Maybe SomeException)
v
  Maybe SomeException -> (SomeException -> m Any) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mexc (IO Any -> m Any
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Any -> m Any)
-> (SomeException -> IO Any) -> SomeException -> m Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> IO Any
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO)