-- | Operations for running IO operations asynchronously.

-- These are the same as in the 'async' package. We do not use
-- 'async' to avoid its dependencies.

{- License for the 'async' package
Copyright (c) 2012, Simon Marlow

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.

    * Redistributions in binary form must reproduce the above
      copyright notice, this list of conditions and the following
      disclaimer in the documentation and/or other materials provided
      with the distribution.

    * Neither the name of Simon Marlow nor the names of other
      contributors may be used to endorse or promote products derived
      from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-}

{-# LANGUAGE DeriveDataTypeable, MagicHash, UnboxedTuples #-}

module Control.Concurrent.Async (
  async, withAsync, wait, asyncThreadId, cancel, concurrently
  ) where

import Control.Concurrent.STM
import Control.Exception
import Control.Concurrent
import Control.Monad
import Data.IORef
import Data.Typeable
import GHC.Conc
import GHC.Exts
import GHC.IO hiding (onException)

-- | An asynchronous action spawned by 'async' or 'withAsync'.
-- Asynchronous actions are executed in a separate thread, and
-- operations are provided for waiting for asynchronous actions to
-- complete and obtaining their results (see e.g. 'wait').
--
data Async a = Async
  { Async a -> ThreadId
asyncThreadId :: {-# UNPACK #-} !ThreadId
                  -- ^ Returns the 'ThreadId' of the thread running
                  -- the given 'Async'.
  , Async a -> STM (Either SomeException a)
_asyncWait    :: STM (Either SomeException a)
  }

-- | Spawn an asynchronous action in a separate thread.
async :: IO a -> IO (Async a)
async :: IO a -> IO (Async a)
async = ((IO () -> IO ThreadId) -> IO a -> IO (Async a))
-> (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO

asyncUsing :: (IO () -> IO ThreadId)
           -> IO a -> IO (Async a)
asyncUsing :: (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
doFork = \IO a
action -> do
   TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
   -- t <- forkFinally action (\r -> atomically $ putTMVar var r)
   -- slightly faster:
   ThreadId
t <- ((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore ->
          IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
   Async a -> IO (Async a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var))

-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function.  When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
--
-- > withAsync action inner = mask $ \restore -> do
-- >   a <- async (restore action)
-- >   restore (inner a) `finally` uninterruptibleCancel a
--
-- This is a useful variant of 'async' that ensures an @Async@ is
-- never left running unintentionally.
--
-- Note: a reference to the child thread is kept alive until the call
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync = ((IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b)
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO

withAsyncUsing :: (IO () -> IO ThreadId)
               -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow.  We can do better by
-- hand-coding it:
withAsyncUsing :: (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
  TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
  ((forall a. IO a -> IO a) -> IO b) -> IO b
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO b) -> IO b)
-> ((forall a. IO a -> IO a) -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    ThreadId
t <- IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
    let a :: Async a
a = ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
    b
r <- IO b -> IO b
forall a. IO a -> IO a
restore (Async a -> IO b
inner Async a
a) IO b -> (SomeException -> IO b) -> IO b
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do
      Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
      SomeException -> IO b
forall e a. Exception e => e -> IO a
throwIO SomeException
e
    Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
    b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- | Wait for an asynchronous action to complete, and return its
-- value.  If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: Async a -> IO a
wait = IO a -> IO a
forall a. IO a -> IO a
tryAgain (IO a -> IO a) -> (Async a -> IO a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> (Async a -> STM a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM a
forall a. Async a -> STM a
waitSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | Wait for an asynchronous action to complete, and return either
-- @Left e@ if the action raised an exception @e@, or @Right a@ if it
-- returned a value @a@.
--
-- > waitCatch = atomically . waitCatchSTM
--
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch = IO (Either SomeException a) -> IO (Either SomeException a)
forall a. IO a -> IO a
tryAgain (IO (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> IO (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> STM (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM a
waitSTM :: Async a -> STM a
waitSTM Async a
a = do
   Either SomeException a
r <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
   (SomeException -> STM a)
-> (a -> STM a) -> Either SomeException a -> STM a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM a
forall e a. Exception e => e -> STM a
throwSTM a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM (Async ThreadId
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w

-- | Cancel an asynchronous action by throwing the @AsyncCancelled@
-- exception to it, and waiting for the `Async` thread to quit.
-- Has no effect if the 'Async' has already completed.
--
-- > cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a
--
-- Note that 'cancel' will not terminate until the thread the 'Async'
-- refers to has terminated. This means that 'cancel' will block for
-- as long said thread blocks when receiving an asynchronous exception.
--
-- For example, it could block if:
--
-- * It's executing a foreign call, and thus cannot receive the asynchronous
-- exception;
-- * It's executing some cleanup handler after having received the exception,
-- and the handler is blocking.
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: Async a -> IO ()
cancel a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) = ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t AsyncCancelled
AsyncCancelled IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either SomeException a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a)

-- | The exception thrown by `cancel` to terminate a thread.
data AsyncCancelled = AsyncCancelled
  deriving (Int -> AsyncCancelled -> ShowS
[AsyncCancelled] -> ShowS
AsyncCancelled -> String
(Int -> AsyncCancelled -> ShowS)
-> (AsyncCancelled -> String)
-> ([AsyncCancelled] -> ShowS)
-> Show AsyncCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AsyncCancelled] -> ShowS
$cshowList :: [AsyncCancelled] -> ShowS
show :: AsyncCancelled -> String
$cshow :: AsyncCancelled -> String
showsPrec :: Int -> AsyncCancelled -> ShowS
$cshowsPrec :: Int -> AsyncCancelled -> ShowS
Show, AsyncCancelled -> AsyncCancelled -> Bool
(AsyncCancelled -> AsyncCancelled -> Bool)
-> (AsyncCancelled -> AsyncCancelled -> Bool) -> Eq AsyncCancelled
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: AsyncCancelled -> AsyncCancelled -> Bool
$c/= :: AsyncCancelled -> AsyncCancelled -> Bool
== :: AsyncCancelled -> AsyncCancelled -> Bool
$c== :: AsyncCancelled -> AsyncCancelled -> Bool
Eq, Typeable)

instance Exception AsyncCancelled where
#if __GLASGOW_HASKELL__ >= 708
  fromException :: SomeException -> Maybe AsyncCancelled
fromException = SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
  toException :: AsyncCancelled -> SomeException
toException = AsyncCancelled -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif

-- | Cancel an asynchronous action
--
-- This is a variant of `cancel`, but it is not interruptible.
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel = IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> (Async a -> IO ()) -> Async a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
cancel

-- | Run two @IO@ actions concurrently, and return both results.  If
-- either action throws an exception at any time, then the other
-- action is 'cancel'led, and the exception is re-thrown by
-- 'concurrently'.
--
-- > concurrently left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)
concurrently :: IO a -> IO b -> IO (a, b)
concurrently IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO (a, b))
-> IO (a, b)
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right ([Either a b] -> IO (Either SomeException (Either a b)) -> IO (a, b)
forall e a b.
Exception e =>
[Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [])
  where
    collect :: [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [Left a
a, Right b
b] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Right b
b, Left a
a] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Either a b]
xs IO (Either e (Either a b))
m = do
        Either e (Either a b)
e <- IO (Either e (Either a b))
m
        case Either e (Either a b)
e of
            Left e
ex -> e -> IO (a, b)
forall e a. Exception e => e -> IO a
throwIO e
ex
            Right Either a b
r -> [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect (Either a b
rEither a b -> [Either a b] -> [Either a b]
forall a. a -> [a] -> [a]
:[Either a b]
xs) IO (Either e (Either a b))
m

concurrently' :: IO a -> IO b
             -> (IO (Either SomeException (Either a b)) -> IO r)
             -> IO r
concurrently' :: IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO r
collect = do
    MVar (Either SomeException (Either a b))
done <- IO (MVar (Either SomeException (Either a b)))
forall a. IO (MVar a)
newEmptyMVar
    ((forall a. IO a -> IO a) -> IO r) -> IO r
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO r) -> IO r)
-> ((forall a. IO a -> IO a) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
        -- Note: uninterruptibleMask here is because we must not allow
        -- the putMVar in the exception handler to be interrupted,
        -- otherwise the parent thread will deadlock when it waits for
        -- the thread to terminate.
        ThreadId
lid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
          IO () -> IO ()
forall a. IO a -> IO a
restore (IO a
left IO a -> (a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (a -> Either SomeException (Either a b)) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (a -> Either a b) -> a -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either a b
forall a b. a -> Either a b
Left)
            IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
        ThreadId
rid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
          IO () -> IO ()
forall a. IO a -> IO a
restore (IO b
right IO b -> (b -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (b -> Either SomeException (Either a b)) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (b -> Either a b) -> b -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Either a b
forall a b. b -> Either a b
Right)
            IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)

        IORef Int
count <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
2 :: Int)
        let takeDone :: IO (Either SomeException (Either a b))
takeDone = do
                Either SomeException (Either a b)
r <- MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done      -- interruptible
                -- Decrement the counter so we know how many takes are left.
                -- Since only the parent thread is calling this, we can
                -- use non-atomic modifications.
                -- NB. do this *after* takeMVar, because takeMVar might be
                -- interrupted.
                IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef Int
count (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
                Either SomeException (Either a b)
-> IO (Either SomeException (Either a b))
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException (Either a b)
r

        let tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnMVar -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> IO a
f

            stop :: IO ()
stop = do
                -- kill right before left, to match the semantics of
                -- the version using withAsync. (#27)
                IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  Int
count' <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
count
                  -- we only need to use killThread if there are still
                  -- children alive.  Note: forkIO here is because the
                  -- child thread could be in an uninterruptible
                  -- putMVar.
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
count' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
                      ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
rid AsyncCancelled
AsyncCancelled
                      ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
lid AsyncCancelled
AsyncCancelled
                  -- ensure the children are really dead
                  Int -> IO (Either SomeException (Either a b)) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
count' (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. IO a -> IO a
tryAgain (IO (Either SomeException (Either a b))
 -> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done)

        r
r <- IO (Either SomeException (Either a b)) -> IO r
collect (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. IO a -> IO a
tryAgain (IO (Either SomeException (Either a b))
 -> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException (Either a b))
takeDone) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`onException` IO ()
stop
        IO ()
stop
        r -> IO r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
Control.Exception.catch

-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO :: IO () -> IO ThreadId
rawForkIO IO ()
action = (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, ThreadId #))
 -> IO ThreadId)
-> (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
   case ((State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# (IO () -> State# RealWorld -> (# State# RealWorld, () #)
forall a. IO a -> State# RealWorld -> (# State# RealWorld, a #)
unIO IO ()
action) State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)