{-# LANGUAGE CPP, MagicHash, UnboxedTuples, RankNTypes,
    ExistentialQuantification #-}
#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif
#if __GLASGOW_HASKELL__ < 710
{-# LANGUAGE DeriveDataTypeable #-}
#endif
{-# OPTIONS -Wall -fno-warn-implicit-prelude -fno-warn-unused-imports #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.Async
-- Copyright   :  (c) Simon Marlow 2012
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Simon Marlow <marlowsd@gmail.com>
-- Stability   :  provisional
-- Portability :  non-portable (requires concurrency)
--
-- This module provides a set of operations for running IO operations
-- asynchronously and waiting for their results.  It is a thin layer
-- over the basic concurrency operations provided by
-- "Control.Concurrent".  The main additional functionality it
-- provides is the ability to wait for the return value of a thread,
-- but the interface also provides some additional safety and
-- robustness over using 'forkIO' threads and @MVar@ directly.
--
-- == High-level API
--
-- @async@'s high-level API spawns /lexically scoped/ threads,
-- ensuring the following key poperties that make it safer to use
-- than using plain 'forkIO':
--
-- 1. No exception is swallowed (waiting for results propagates exceptions).
-- 2. No thread is leaked (left running unintentionally).
--
-- (This is done using the 'Control.Exception.bracket' pattern to work in presence
-- of synchornous and asynchronous exceptions.)
--
-- __Most practical/production code should only use the high-level API__.
--
-- The basic type is @'Async' a@, which represents an asynchronous
-- @IO@ action that will return a value of type @a@, or die with an
-- exception.  An 'Async' is a wrapper around a low-level 'forkIO' thread.
--
-- The fundamental function to spawn threads with the high-level API is
-- 'withAsync'.
--
-- For example, to fetch two web pages at the same time, we could do
-- this (assuming a suitable @getURL@ function):
--
-- > withAsync (getURL url1) $ \a1 -> do
-- >   withAsync (getURL url2) $ \a2 -> do
-- >     page1 <- wait a1
-- >     page2 <- wait a2
-- >     ...
--
-- where 'withAsync' starts the operation in a separate thread, and
-- 'wait' waits for and returns the result.
--
-- * If the operation throws an exception, then that exception is re-thrown
--   by 'wait'. This ensures property (1): No exception is swallowed.
-- * If an exception bubbles up through a 'withAsync', then the 'Async'
--   it spawned is 'cancel'ed. This ensures property (2): No thread is leaked.
--
-- Often we do not care to work manually with 'Async' handles like
-- @a1@ and @a2@. Instead, we want to express high-level objectives like
-- performing two or more tasks concurrently, and waiting for one or all
-- of them to finish.
--
-- For example, the pattern of performing two IO actions concurrently and
-- waiting for both their results is packaged up in a combinator 'concurrently',
-- so we can further shorten the above example to:
--
-- > (page1, page2) <- concurrently (getURL url1) (getURL url2)
-- > ...
--
-- The section __/High-level utilities/__ covers the most
-- common high-level objectives, including:
--
-- * Waiting for 2 results ('concurrently').
-- * Waiting for many results ('mapConcurrently' / 'forConcurrently').
-- * Waiting for the first of 2 results ('race').
-- * Waiting for arbitrary nestings of "all of /N/" and "the first of /N/"
--   results with the 'Concurrently' newtype and its 'Applicative' and
--   'Alternative' instances.
--
-- Click here to scroll to that section:
-- "Control.Concurrent.Async#high-level-utilities".
--
-- == Low-level API
--
-- Some use cases require parallelism that is not lexically scoped.
--
-- For those, the low-level function 'async' can be used as a direct
-- equivalent of 'forkIO':
--
-- > -- Do NOT use this code in production, it has a flaw (explained below).
-- > do
-- >   a1 <- async (getURL url1)
-- >   a2 <- async (getURL url2)
-- >   page1 <- wait a1
-- >   page2 <- wait a2
-- >   ...
--
-- In contrast to 'withAsync', this code has a problem.
--
-- It still fulfills property (1) in that an exception arising from
-- @getUrl@ will be re-thrown by 'wait', but it does not fulfill
-- property (2).
-- Consider the case when the first 'wait' throws an exception; then the
-- second 'wait' will not happen, and the second 'async' may be left
-- running in the background, possibly indefinitely.
--
-- 'withAsync' is like 'async', except that the 'Async' is
-- automatically killed (using 'uninterruptibleCancel') if the
-- enclosing IO operation returns before it has completed.
-- Furthermore, 'withAsync' allows a tree of threads to be built, such
-- that children are automatically killed if their parents die for any
-- reason.
--
-- If you need to use the low-level API, ensure that you gurantee
-- property (2) by other means, such as 'link'ing asyncs that need
-- to die together, and protecting against asynchronous exceptions
-- using 'Control.Exception.bracket', 'Control.Exception.mask',
-- or other functions from "Control.Exception".
--
-- == Miscellaneous
--
-- The 'Functor' instance can be used to change the result of an
-- 'Async'.  For example:
--
-- > ghci> withAsync (return 3) (\a -> wait (fmap (+1) a))
-- > 4
--
-- === Resource exhaustion
--
-- As with all concurrent programming, keep in mind that while
-- Haskell's cooperative ("green") multithreading carries low overhead,
-- spawning too many of them at the same time may lead to resource exhaustion
-- (of memory, file descriptors, or other limited resources), given that the
-- actions running in the threads consume these resources.

-----------------------------------------------------------------------------

module Control.Concurrent.Async (

    -- * Asynchronous actions
    Async,

    -- * High-level API

    -- ** Spawning with automatic 'cancel'ation
    withAsync, withAsyncBound, withAsyncOn, withAsyncWithUnmask,
    withAsyncOnWithUnmask,

    -- ** Querying 'Async's
    wait, poll, waitCatch, asyncThreadId,
    cancel, uninterruptibleCancel, cancelWith, AsyncCancelled(..),

    -- ** #high-level-utilities# High-level utilities
    race, race_,
    concurrently, concurrently_,
    mapConcurrently, forConcurrently,
    mapConcurrently_, forConcurrently_,
    replicateConcurrently, replicateConcurrently_,
    Concurrently(..),
    compareAsyncs,

    -- ** Specialised operations

    -- *** STM operations
    waitSTM, pollSTM, waitCatchSTM,

    -- *** Waiting for multiple 'Async's
    waitAny, waitAnyCatch, waitAnyCancel, waitAnyCatchCancel,
    waitEither, waitEitherCatch, waitEitherCancel, waitEitherCatchCancel,
    waitEither_,
    waitBoth,

    -- *** Waiting for multiple 'Async's in STM
    waitAnySTM, waitAnyCatchSTM,
    waitEitherSTM, waitEitherCatchSTM,
    waitEitherSTM_,
    waitBothSTM,

    -- * Low-level API

    -- ** Spawning (low-level API)
    async, asyncBound, asyncOn, asyncWithUnmask, asyncOnWithUnmask,

    -- ** Linking
    link, linkOnly, link2, link2Only, ExceptionInLinkedThread(..),

  ) where

import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Concurrent
import qualified Data.Foldable as F
#if !MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Control.Monad
import Control.Applicative
#if !MIN_VERSION_base(4,8,0)
import Data.Monoid (Monoid(mempty,mappend))
import Data.Traversable
#endif
#if __GLASGOW_HASKELL__ < 710
import Data.Typeable
#endif
#if MIN_VERSION_base(4,9,0)
import Data.Semigroup (Semigroup((<>)))
#endif

import Data.IORef

import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc

-- -----------------------------------------------------------------------------
-- STM Async API


-- | An asynchronous action spawned by 'async' or 'withAsync'.
-- Asynchronous actions are executed in a separate thread, and
-- operations are provided for waiting for asynchronous actions to
-- complete and obtaining their results (see e.g. 'wait').
--
data Async a = Async
  { forall a. Async a -> ThreadId
asyncThreadId :: {-# UNPACK #-} !ThreadId
                  -- ^ Returns the 'ThreadId' of the thread running
                  -- the given 'Async'.
  , forall a. Async a -> STM (Either SomeException a)
_asyncWait    :: STM (Either SomeException a)
  }

instance Eq (Async a) where
  Async ThreadId
a STM (Either SomeException a)
_ == :: Async a -> Async a -> Bool
== Async ThreadId
b STM (Either SomeException a)
_  =  ThreadId
a forall a. Eq a => a -> a -> Bool
== ThreadId
b

instance Ord (Async a) where
  Async ThreadId
a STM (Either SomeException a)
_ compare :: Async a -> Async a -> Ordering
`compare` Async ThreadId
b STM (Either SomeException a)
_  =  ThreadId
a forall a. Ord a => a -> a -> Ordering
`compare` ThreadId
b

instance Functor Async where
  fmap :: forall a b. (a -> b) -> Async a -> Async b
fmap a -> b
f (Async ThreadId
a STM (Either SomeException a)
w) = forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
a (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) STM (Either SomeException a)
w)

-- | Compare two Asyncs that may have different types by their 'ThreadId'.
compareAsyncs :: Async a -> Async b -> Ordering
compareAsyncs :: forall a b. Async a -> Async b -> Ordering
compareAsyncs (Async ThreadId
t1 STM (Either SomeException a)
_) (Async ThreadId
t2 STM (Either SomeException b)
_) = forall a. Ord a => a -> a -> Ordering
compare ThreadId
t1 ThreadId
t2

-- | Spawn an asynchronous action in a separate thread.
--
-- Like for 'forkIO', the action may be left running unintentinally
-- (see module-level documentation for details).
--
-- __Use 'withAsync' style functions wherever you can instead!__
async :: IO a -> IO (Async a)
async :: forall a. IO a -> IO (Async a)
async = forall a. a -> a
inline forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO

-- | Like 'async' but using 'forkOS' internally.
asyncBound :: IO a -> IO (Async a)
asyncBound :: forall a. IO a -> IO (Async a)
asyncBound = forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
forkOS

-- | Like 'async' but using 'forkOn' internally.
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn :: forall a. Int -> IO a -> IO (Async a)
asyncOn = forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn

-- | Like 'async' but using 'forkIOWithUnmask' internally.  The child
-- thread is passed a function that can be used to unmask asynchronous
-- exceptions.
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask :: forall a. ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask (forall b. IO b -> IO b) -> IO a
actionWith = forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally.  The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask :: forall a. Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
  forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

asyncUsing :: (IO () -> IO ThreadId)
           -> IO a -> IO (Async a)
asyncUsing :: forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
doFork = \IO a
action -> do
   TMVar (Either SomeException a)
var <- forall a. IO (TMVar a)
newEmptyTMVarIO
   -- t <- forkFinally action (\r -> atomically $ putTMVar var r)
   -- slightly faster:
   ThreadId
t <- forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
          IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> IO (Either e a)
try (forall b. IO b -> IO b
restore IO a
action) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
   forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var))

-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function.  When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
--
-- > withAsync action inner = mask $ \restore -> do
-- >   a <- async (restore action)
-- >   restore (inner a) `finally` uninterruptibleCancel a
--
-- This is a useful variant of 'async' that ensures an @Async@ is
-- never left running unintentionally.
--
-- Note: a reference to the child thread is kept alive until the call
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync :: forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync = forall a. a -> a
inline forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO

-- | Like 'withAsync' but uses 'forkOS' internally.
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound :: forall a b. IO a -> (Async a -> IO b) -> IO b
withAsyncBound = forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
forkOS

-- | Like 'withAsync' but uses 'forkOn' internally.
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn :: forall a b. Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn = forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn

-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally.  The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
withAsyncWithUnmask
  :: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask :: forall a b.
((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask (forall b. IO b -> IO b) -> IO a
actionWith =
  forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally.  The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions
withAsyncOnWithUnmask
  :: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask :: forall a b.
Int
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
  forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

withAsyncUsing :: (IO () -> IO ThreadId)
               -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow.  We can do better by
-- hand-coding it:
withAsyncUsing :: forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
  TMVar (Either SomeException a)
var <- forall a. IO (TMVar a)
newEmptyTMVarIO
  forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
    ThreadId
t <- IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> IO (Either e a)
try (forall b. IO b -> IO b
restore IO a
action) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
    let a :: Async a
a = forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
    b
r <- forall b. IO b -> IO b
restore (Async a -> IO b
inner Async a
a) forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do
      forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
      forall e a. Exception e => e -> IO a
throwIO SomeException
e
    forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
    forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- | Wait for an asynchronous action to complete, and return its
-- value.  If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: forall a. Async a -> IO a
wait = forall b. IO b -> IO b
tryAgain forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM a
waitSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | Wait for an asynchronous action to complete, and return either
-- @Left e@ if the action raised an exception @e@, or @Right a@ if it
-- returned a value @a@.
--
-- > waitCatch = atomically . waitCatchSTM
--
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: forall a. Async a -> IO (Either SomeException a)
waitCatch = forall b. IO b -> IO b
tryAgain forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM (Either SomeException a)
waitCatchSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | Check whether an 'Async' has completed yet.  If it has not
-- completed yet, then the result is @Nothing@, otherwise the result
-- is @Just e@ where @e@ is @Left x@ if the @Async@ raised an
-- exception @x@, or @Right a@ if it returned a value @a@.
--
-- > poll = atomically . pollSTM
--
{-# INLINE poll #-}
poll :: Async a -> IO (Maybe (Either SomeException a))
poll :: forall a. Async a -> IO (Maybe (Either SomeException a))
poll = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM a
waitSTM :: forall a. Async a -> STM a
waitSTM Async a
a = do
   Either SomeException a
r <- forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
   forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall e a. Exception e => e -> STM a
throwSTM forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: forall a. Async a -> STM (Either SomeException a)
waitCatchSTM (Async ThreadId
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w

-- | A version of 'poll' that can be used inside an STM transaction.
--
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM :: forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM (Async ThreadId
_ STM (Either SomeException a)
w) = (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either SomeException a)
w) forall a. STM a -> STM a -> STM a
`orElse` forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

-- | Cancel an asynchronous action by throwing the @AsyncCancelled@
-- exception to it, and waiting for the `Async` thread to quit.
-- Has no effect if the 'Async' has already completed.
--
-- > cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a
--
-- Note that 'cancel' will not terminate until the thread the 'Async'
-- refers to has terminated. This means that 'cancel' will block for
-- as long said thread blocks when receiving an asynchronous exception.
--
-- For example, it could block if:
--
-- * It's executing a foreign call, and thus cannot receive the asynchronous
-- exception;
-- * It's executing some cleanup handler after having received the exception,
-- and the handler is blocking.
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: forall a. Async a -> IO ()
cancel a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) = forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t AsyncCancelled
AsyncCancelled forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a

-- | The exception thrown by `cancel` to terminate a thread.
data AsyncCancelled = AsyncCancelled
  deriving (Int -> AsyncCancelled -> ShowS
[AsyncCancelled] -> ShowS
AsyncCancelled -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AsyncCancelled] -> ShowS
$cshowList :: [AsyncCancelled] -> ShowS
show :: AsyncCancelled -> String
$cshow :: AsyncCancelled -> String
showsPrec :: Int -> AsyncCancelled -> ShowS
$cshowsPrec :: Int -> AsyncCancelled -> ShowS
Show, AsyncCancelled -> AsyncCancelled -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: AsyncCancelled -> AsyncCancelled -> Bool
$c/= :: AsyncCancelled -> AsyncCancelled -> Bool
== :: AsyncCancelled -> AsyncCancelled -> Bool
$c== :: AsyncCancelled -> AsyncCancelled -> Bool
Eq
#if __GLASGOW_HASKELL__ < 710
    ,Typeable
#endif
    )

instance Exception AsyncCancelled where
#if __GLASGOW_HASKELL__ >= 708
  fromException :: SomeException -> Maybe AsyncCancelled
fromException = forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
  toException :: AsyncCancelled -> SomeException
toException = forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif

-- | Cancel an asynchronous action
--
-- This is a variant of `cancel`, but it is not interruptible.
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel :: forall a. Async a -> IO ()
uninterruptibleCancel = forall b. IO b -> IO b
uninterruptibleMask_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO ()
cancel

-- | Cancel an asynchronous action by throwing the supplied exception
-- to it.
--
-- > cancelWith a x = throwTo (asyncThreadId a) x
--
-- The notes about the synchronous nature of 'cancel' also apply to
-- 'cancelWith'.
cancelWith :: Exception e => Async a -> e -> IO ()
cancelWith :: forall e a. Exception e => Async a -> e -> IO ()
cancelWith a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) e
e = forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t e
e forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a

-- | Wait for any of the supplied asynchronous operations to complete.
-- The value returned is a pair of the 'Async' that completed, and the
-- result that would be returned by 'wait' on that 'Async'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
{-# INLINE waitAnyCatch #-}
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch :: forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM

-- | A version of 'waitAnyCatch' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitAnyCatchSTM :: [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM :: forall a. [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM [Async a]
asyncs =
    forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall a. STM a -> STM a -> STM a
orElse forall a. STM a
retry forall a b. (a -> b) -> a -> b
$
      forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do Either SomeException a
r <- forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a; forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, Either SomeException a
r)) [Async a]
asyncs

-- | Like 'waitAnyCatch', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel :: forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async a]
asyncs =
  forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async a]
asyncs forall a b. IO a -> IO b -> IO a
`finally` forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. Async a -> IO ()
cancel [Async a]
asyncs

-- | Wait for any of the supplied @Async@s to complete.  If the first
-- to complete throws an exception, then that exception is re-thrown
-- by 'waitAny'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
{-# INLINE waitAny #-}
waitAny :: [Async a] -> IO (Async a, a)
waitAny :: forall a. [Async a] -> IO (Async a, a)
waitAny = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [Async a] -> STM (Async a, a)
waitAnySTM

-- | A version of 'waitAny' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitAnySTM :: [Async a] -> STM (Async a, a)
waitAnySTM :: forall a. [Async a] -> STM (Async a, a)
waitAnySTM [Async a]
asyncs =
    forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall a. STM a -> STM a -> STM a
orElse forall a. STM a
retry forall a b. (a -> b) -> a -> b
$
      forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do a
r <- forall a. Async a -> STM a
waitSTM Async a
a; forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, a
r)) [Async a]
asyncs

-- | Like 'waitAny', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel :: forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async a]
asyncs =
  forall a. [Async a] -> IO (Async a, a)
waitAny [Async a]
asyncs forall a b. IO a -> IO b -> IO a
`finally` forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. Async a -> IO ()
cancel [Async a]
asyncs

-- | Wait for the first of two @Async@s to finish.
{-# INLINE waitEitherCatch #-}
waitEitherCatch :: Async a -> Async b
                -> IO (Either (Either SomeException a)
                              (Either SomeException b))
waitEitherCatch :: forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right =
  forall b. IO b -> IO b
tryAgain forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically (forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async a
left Async b
right)
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | A version of 'waitEitherCatch' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitEitherCatchSTM :: Async a -> Async b
                -> STM (Either (Either SomeException a)
                               (Either SomeException b))
waitEitherCatchSTM :: forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async a
left Async b
right =
    (forall a b. a -> Either a b
Left  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
left)
      forall a. STM a -> STM a -> STM a
`orElse`
    (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async b
right)

-- | Like 'waitEitherCatch', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCatchCancel :: Async a -> Async b
                      -> IO (Either (Either SomeException a)
                                    (Either SomeException b))
waitEitherCatchCancel :: forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel Async a
left Async b
right =
  forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right forall a b. IO a -> IO b -> IO a
`finally` (forall a. Async a -> IO ()
cancel Async a
left forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async b
right)

-- | Wait for the first of two @Async@s to finish.  If the @Async@
-- that finished first raised an exception, then the exception is
-- re-thrown by 'waitEither'.
--
{-# INLINE waitEither #-}
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither :: forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right = forall a. STM a -> IO a
atomically (forall a b. Async a -> Async b -> STM (Either a b)
waitEitherSTM Async a
left Async b
right)

-- | A version of 'waitEither' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitEitherSTM :: Async a -> Async b -> STM (Either a b)
waitEitherSTM :: forall a b. Async a -> Async b -> STM (Either a b)
waitEitherSTM Async a
left Async b
right =
    (forall a b. a -> Either a b
Left  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM a
waitSTM Async a
left)
      forall a. STM a -> STM a -> STM a
`orElse`
    (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM a
waitSTM Async b
right)

-- | Like 'waitEither', but the result is ignored.
--
{-# INLINE waitEither_ #-}
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ :: forall a b. Async a -> Async b -> IO ()
waitEither_ Async a
left Async b
right = forall a. STM a -> IO a
atomically (forall a b. Async a -> Async b -> STM ()
waitEitherSTM_ Async a
left Async b
right)

-- | A version of 'waitEither_' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitEitherSTM_:: Async a -> Async b -> STM ()
waitEitherSTM_ :: forall a b. Async a -> Async b -> STM ()
waitEitherSTM_ Async a
left Async b
right =
    (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> STM a
waitSTM Async a
left)
      forall a. STM a -> STM a -> STM a
`orElse`
    (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> STM a
waitSTM Async b
right)

-- | Like 'waitEither', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel :: forall a b. Async a -> Async b -> IO (Either a b)
waitEitherCancel Async a
left Async b
right =
  forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right forall a b. IO a -> IO b -> IO a
`finally` (forall a. Async a -> IO ()
cancel Async a
left forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async b
right)

-- | Waits for both @Async@s to finish, but if either of them throws
-- an exception before they have both finished, then the exception is
-- re-thrown by 'waitBoth'.
--
{-# INLINE waitBoth #-}
waitBoth :: Async a -> Async b -> IO (a,b)
waitBoth :: forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async a
left Async b
right = forall b. IO b -> IO b
tryAgain forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically (forall a b. Async a -> Async b -> STM (a, b)
waitBothSTM Async a
left Async b
right)
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | A version of 'waitBoth' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitBothSTM :: Async a -> Async b -> STM (a,b)
waitBothSTM :: forall a b. Async a -> Async b -> STM (a, b)
waitBothSTM Async a
left Async b
right = do
    a
a <- forall a. Async a -> STM a
waitSTM Async a
left
           forall a. STM a -> STM a -> STM a
`orElse`
         (forall a. Async a -> STM a
waitSTM Async b
right forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. STM a
retry)
    b
b <- forall a. Async a -> STM a
waitSTM Async b
right
    forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)


-- -----------------------------------------------------------------------------
-- Linking threads

data ExceptionInLinkedThread =
  forall a . ExceptionInLinkedThread (Async a) SomeException
#if __GLASGOW_HASKELL__ < 710
  deriving Typeable
#endif

instance Show ExceptionInLinkedThread where
  showsPrec :: Int -> ExceptionInLinkedThread -> ShowS
showsPrec Int
p (ExceptionInLinkedThread (Async ThreadId
t STM (Either SomeException a)
_) SomeException
e) =
    Bool -> ShowS -> ShowS
showParen (Int
p forall a. Ord a => a -> a -> Bool
>= Int
11) forall a b. (a -> b) -> a -> b
$
      String -> ShowS
showString String
"ExceptionInLinkedThread " forall b c a. (b -> c) -> (a -> b) -> a -> c
.
      forall a. Show a => Int -> a -> ShowS
showsPrec Int
11 ThreadId
t forall b c a. (b -> c) -> (a -> b) -> a -> c
.
      String -> ShowS
showString String
" " forall b c a. (b -> c) -> (a -> b) -> a -> c
.
      forall a. Show a => Int -> a -> ShowS
showsPrec Int
11 SomeException
e

instance Exception ExceptionInLinkedThread where
#if __GLASGOW_HASKELL__ >= 708
  fromException :: SomeException -> Maybe ExceptionInLinkedThread
fromException = forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
  toException :: ExceptionInLinkedThread -> SomeException
toException = forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif

-- | Link the given @Async@ to the current thread, such that if the
-- @Async@ raises an exception, that exception will be re-thrown in
-- the current thread, wrapped in 'ExceptionInLinkedThread'.
--
-- 'link' ignores 'AsyncCancelled' exceptions thrown in the other thread,
-- so that it's safe to 'cancel' a thread you're linked to.  If you want
-- different behaviour, use 'linkOnly'.
--
link :: Async a -> IO ()
link :: forall a. Async a -> IO ()
link = forall a. (SomeException -> Bool) -> Async a -> IO ()
linkOnly (Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Bool
isCancel)

-- | Link the given @Async@ to the current thread, such that if the
-- @Async@ raises an exception, that exception will be re-thrown in
-- the current thread, wrapped in 'ExceptionInLinkedThread'.
--
-- The supplied predicate determines which exceptions in the target
-- thread should be propagated to the source thread.
--
linkOnly
  :: (SomeException -> Bool)  -- ^ return 'True' if the exception
                              -- should be propagated, 'False'
                              -- otherwise.
  -> Async a
  -> IO ()
linkOnly :: forall a. (SomeException -> Bool) -> Async a -> IO ()
linkOnly SomeException -> Bool
shouldThrow Async a
a = do
  ThreadId
me <- IO ThreadId
myThreadId
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO ThreadId
forkRepeat forall a b. (a -> b) -> a -> b
$ do
    Either SomeException a
r <- forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a
    case Either SomeException a
r of
      Left SomeException
e | SomeException -> Bool
shouldThrow SomeException
e -> forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
me (forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async a
a SomeException
e)
      Either SomeException a
_otherwise -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Link two @Async@s together, such that if either raises an
-- exception, the same exception is re-thrown in the other @Async@,
-- wrapped in 'ExceptionInLinkedThread'.
--
-- 'link2' ignores 'AsyncCancelled' exceptions, so that it's possible
-- to 'cancel' either thread without cancelling the other.  If you
-- want different behaviour, use 'link2Only'.
--
link2 :: Async a -> Async b -> IO ()
link2 :: forall a b. Async a -> Async b -> IO ()
link2 = forall a b. (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only (Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Bool
isCancel)

-- | Link two @Async@s together, such that if either raises an
-- exception, the same exception is re-thrown in the other @Async@,
-- wrapped in 'ExceptionInLinkedThread'.
--
-- The supplied predicate determines which exceptions in the target
-- thread should be propagated to the source thread.
--
link2Only :: (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only :: forall a b. (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only SomeException -> Bool
shouldThrow left :: Async a
left@(Async ThreadId
tl STM (Either SomeException a)
_)  right :: Async b
right@(Async ThreadId
tr STM (Either SomeException b)
_) =
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO ThreadId
forkRepeat forall a b. (a -> b) -> a -> b
$ do
    Either (Either SomeException a) (Either SomeException b)
r <- forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right
    case Either (Either SomeException a) (Either SomeException b)
r of
      Left  (Left SomeException
e) | SomeException -> Bool
shouldThrow SomeException
e ->
        forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tr (forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async a
left SomeException
e)
      Right (Left SomeException
e) | SomeException -> Bool
shouldThrow SomeException
e ->
        forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tl (forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async b
right SomeException
e)
      Either (Either SomeException a) (Either SomeException b)
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

isCancel :: SomeException -> Bool
isCancel :: SomeException -> Bool
isCancel SomeException
e
  | Just AsyncCancelled
AsyncCancelled <- forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = Bool
True
  | Bool
otherwise = Bool
False


-- -----------------------------------------------------------------------------

-- | Run two @IO@ actions concurrently, and return the first to
-- finish.  The loser of the race is 'cancel'led.
--
-- > race left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitEither a b
--
race :: IO a -> IO b -> IO (Either a b)

-- | Like 'race', but the result is ignored.
--
race_ :: IO a -> IO b -> IO ()

-- | Run two @IO@ actions concurrently, and return both results.  If
-- either action throws an exception at any time, then the other
-- action is 'cancel'led, and the exception is re-thrown by
-- 'concurrently'.
--
-- > concurrently left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)

-- | 'concurrently', but ignore the result values
--
-- @since 2.1.1
concurrently_ :: IO a -> IO b -> IO ()

#define USE_ASYNC_VERSIONS 0

#if USE_ASYNC_VERSIONS

race left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitEither a b

race_ left right = void $ race left right

concurrently left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitBoth a b

concurrently_ left right = void $ concurrently left right

#else

-- MVar versions of race/concurrently
-- More ugly than the Async versions, but quite a bit faster.

-- race :: IO a -> IO b -> IO (Either a b)
race :: forall a b. IO a -> IO b -> IO (Either a b)
race IO a
left IO b
right = forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right forall {e} {b}. Exception e => IO (Either e b) -> IO b
collect
  where
    collect :: IO (Either e b) -> IO b
collect IO (Either e b)
m = do
        Either e b
e <- IO (Either e b)
m
        case Either e b
e of
            Left e
ex -> forall e a. Exception e => e -> IO a
throwIO e
ex
            Right b
r -> forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- race_ :: IO a -> IO b -> IO ()
race_ :: forall a b. IO a -> IO b -> IO ()
race_ IO a
left IO b
right = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a b. IO a -> IO b -> IO (Either a b)
race IO a
left IO b
right

-- concurrently :: IO a -> IO b -> IO (a,b)
concurrently :: forall a b. IO a -> IO b -> IO (a, b)
concurrently IO a
left IO b
right = forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right (forall {e} {a} {b}.
Exception e =>
[Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [])
  where
    collect :: [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [Left a
a, Right b
b] IO (Either e (Either a b))
_ = forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Right b
b, Left a
a] IO (Either e (Either a b))
_ = forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Either a b]
xs IO (Either e (Either a b))
m = do
        Either e (Either a b)
e <- IO (Either e (Either a b))
m
        case Either e (Either a b)
e of
            Left e
ex -> forall e a. Exception e => e -> IO a
throwIO e
ex
            Right Either a b
r -> [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect (Either a b
rforall a. a -> [a] -> [a]
:[Either a b]
xs) IO (Either e (Either a b))
m

concurrently' :: IO a -> IO b
             -> (IO (Either SomeException (Either a b)) -> IO r)
             -> IO r
concurrently' :: forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO r
collect = do
    MVar (Either SomeException (Either a b))
done <- forall a. IO (MVar a)
newEmptyMVar
    forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
        -- Note: uninterruptibleMask here is because we must not allow
        -- the putMVar in the exception handler to be interrupted,
        -- otherwise the parent thread will deadlock when it waits for
        -- the thread to terminate.
        ThreadId
lid <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$
          forall b. IO b -> IO b
restore (IO a
left forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left)
            forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left)
        ThreadId
rid <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$
          forall b. IO b -> IO b
restore (IO b
right forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right)
            forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left)

        IORef Int
count <- forall a. a -> IO (IORef a)
newIORef (Int
2 :: Int)
        let takeDone :: IO (Either SomeException (Either a b))
takeDone = do
                Either SomeException (Either a b)
r <- forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done      -- interruptible
                -- Decrement the counter so we know how many takes are left.
                -- Since only the parent thread is calling this, we can
                -- use non-atomic modifications.
                -- NB. do this *after* takeMVar, because takeMVar might be
                -- interrupted.
                forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef Int
count (forall a. Num a => a -> a -> a
subtract Int
1)
                forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException (Either a b)
r

        let tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> IO a
f

            stop :: IO ()
stop = do
                -- kill right before left, to match the semantics of
                -- the version using withAsync. (#27)
                forall b. IO b -> IO b
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$ do
                  Int
count' <- forall a. IORef a -> IO a
readIORef IORef Int
count
                  -- we only need to use killThread if there are still
                  -- children alive.  Note: forkIO here is because the
                  -- child thread could be in an uninterruptible
                  -- putMVar.
                  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
count' forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$
                    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ do
                      forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
rid AsyncCancelled
AsyncCancelled
                      forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
lid AsyncCancelled
AsyncCancelled
                  -- ensure the children are really dead
                  forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
count' (forall b. IO b -> IO b
tryAgain forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done)

        r
r <- IO (Either SomeException (Either a b)) -> IO r
collect (forall b. IO b -> IO b
tryAgain forall a b. (a -> b) -> a -> b
$ IO (Either SomeException (Either a b))
takeDone) forall a b. IO a -> IO b -> IO a
`onException` IO ()
stop
        IO ()
stop
        forall (m :: * -> *) a. Monad m => a -> m a
return r
r

concurrently_ :: forall a b. IO a -> IO b -> IO ()
concurrently_ IO a
left IO b
right = forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right (forall {e} {b}. Exception e => Int -> IO (Either e b) -> IO ()
collect Int
0)
  where
    collect :: Int -> IO (Either e b) -> IO ()
collect Int
2 IO (Either e b)
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()
    collect Int
i IO (Either e b)
m = do
        Either e b
e <- IO (Either e b)
m
        case Either e b
e of
            Left e
ex -> forall e a. Exception e => e -> IO a
throwIO e
ex
            Right b
_ -> Int -> IO (Either e b) -> IO ()
collect (Int
i forall a. Num a => a -> a -> a
+ Int
1 :: Int) IO (Either e b)
m


#endif

-- | Maps an 'IO'-performing function over any 'Traversable' data
-- type, performing all the @IO@ actions concurrently, and returning
-- the original data structure with the arguments replaced by the
-- results.
--
-- If any of the actions throw an exception, then all other actions are
-- cancelled and the exception is re-thrown.
--
-- For example, @mapConcurrently@ works with lists:
--
-- > pages <- mapConcurrently getURL ["url1", "url2", "url3"]
--
-- Take into account that @async@ will try to immediately spawn a thread
-- for each element of the @Traversable@, so running this on large
-- inputs without care may lead to resource exhaustion (of memory,
-- file descriptors, or other limited resources).
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently :: forall (t :: * -> *) a b.
Traversable t =>
(a -> IO b) -> t a -> IO (t b)
mapConcurrently a -> IO b
f = forall a. Concurrently a -> IO a
runConcurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a. IO a -> Concurrently a
Concurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f)

-- | `forConcurrently` is `mapConcurrently` with its arguments flipped
--
-- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
--
-- @since 2.1.0
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently :: forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
forConcurrently = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: * -> *) a b.
Traversable t =>
(a -> IO b) -> t a -> IO (t b)
mapConcurrently

-- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
-- a concurrent equivalent of 'mapM_'.
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ :: forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ a -> IO b
f = forall a. Concurrently a -> IO a
runConcurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
F.foldMap (forall a. IO a -> Concurrently a
Concurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f)

-- | `forConcurrently_` is `forConcurrently` with the return value discarded;
-- a concurrent equivalent of 'forM_'.
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ :: forall (f :: * -> *) a b. Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_

-- | Perform the action in the given number of threads.
--
-- @since 2.1.1
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently :: forall a. Int -> IO a -> IO [a]
replicateConcurrently Int
cnt = forall a. Concurrently a -> IO a
runConcurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Int -> a -> [a]
replicate Int
cnt forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IO a -> Concurrently a
Concurrently

-- | Same as 'replicateConcurrently', but ignore the results.
--
-- @since 2.1.1
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ :: forall a. Int -> IO a -> IO ()
replicateConcurrently_ Int
cnt = forall a. Concurrently a -> IO a
runConcurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Int -> a -> [a]
replicate Int
cnt forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IO a -> Concurrently a
Concurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a. Functor f => f a -> f ()
void

-- -----------------------------------------------------------------------------

-- | A value of type @Concurrently a@ is an @IO@ operation that can be
-- composed with other @Concurrently@ values, using the @Applicative@
-- and @Alternative@ instances.
--
-- Calling @runConcurrently@ on a value of type @Concurrently a@ will
-- execute the @IO@ operations it contains concurrently, before
-- delivering the result of type @a@.
--
-- For example
--
-- > (page1, page2, page3)
-- >     <- runConcurrently $ (,,)
-- >     <$> Concurrently (getURL "url1")
-- >     <*> Concurrently (getURL "url2")
-- >     <*> Concurrently (getURL "url3")
--
newtype Concurrently a = Concurrently { forall a. Concurrently a -> IO a
runConcurrently :: IO a }

instance Functor Concurrently where
  fmap :: forall a b. (a -> b) -> Concurrently a -> Concurrently b
fmap a -> b
f (Concurrently IO a
a) = forall a. IO a -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ a -> b
f forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
a

instance Applicative Concurrently where
  pure :: forall a. a -> Concurrently a
pure = forall a. IO a -> Concurrently a
Concurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> m a
return
  Concurrently IO (a -> b)
fs <*> :: forall a b.
Concurrently (a -> b) -> Concurrently a -> Concurrently b
<*> Concurrently IO a
as =
    forall a. IO a -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ (\(a -> b
f, a
a) -> a -> b
f a
a) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (a, b)
concurrently IO (a -> b)
fs IO a
as

instance Alternative Concurrently where
  empty :: forall a. Concurrently a
empty = forall a. IO a -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound)
  Concurrently IO a
as <|> :: forall a. Concurrently a -> Concurrently a -> Concurrently a
<|> Concurrently IO a
bs =
    forall a. IO a -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (Either a b)
race IO a
as IO a
bs

#if MIN_VERSION_base(4,9,0)
-- | Only defined by @async@ for @base >= 4.9@
--
-- @since 2.1.0
instance Semigroup a => Semigroup (Concurrently a) where
  <> :: Concurrently a -> Concurrently a -> Concurrently a
(<>) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Semigroup a => a -> a -> a
(<>)

-- | @since 2.1.0
instance (Semigroup a, Monoid a) => Monoid (Concurrently a) where
  mempty :: Concurrently a
mempty = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Monoid a => a
mempty
  mappend :: Concurrently a -> Concurrently a -> Concurrently a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
#else
-- | @since 2.1.0
instance Monoid a => Monoid (Concurrently a) where
  mempty = pure mempty
  mappend = liftA2 mappend
#endif

-- ----------------------------------------------------------------------------

-- | Fork a thread that runs the supplied action, and if it raises an
-- exception, re-runs the action.  The thread terminates only when the
-- action runs to completion without raising an exception.
forkRepeat :: IO a -> IO ThreadId
forkRepeat :: forall a. IO a -> IO ThreadId
forkRepeat IO a
action =
  forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
    let go :: IO ()
go = do Either SomeException a
r <- forall a. IO a -> IO (Either SomeException a)
tryAll (forall b. IO b -> IO b
restore IO a
action)
                case Either SomeException a
r of
                  Left SomeException
_ -> IO ()
go
                  Either SomeException a
_      -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    in IO () -> IO ThreadId
forkIO IO ()
go

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: forall a. IO a -> (SomeException -> IO a) -> IO a
catchAll = forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch

tryAll :: IO a -> IO (Either SomeException a)
tryAll :: forall a. IO a -> IO (Either SomeException a)
tryAll = forall e a. Exception e => IO a -> IO (Either e a)
try

-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO :: IO () -> IO ThreadId
rawForkIO (IO State# RealWorld -> (# State# RealWorld, () #)
action) = forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
   case (forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# State# RealWorld -> (# State# RealWorld, () #)
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)

{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# Int#
cpu) (IO State# RealWorld -> (# State# RealWorld, () #)
action) = forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
   case (forall a.
Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forkOn# Int#
cpu State# RealWorld -> (# State# RealWorld, () #)
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)