{-# LANGUAGE CPP, MagicHash, UnboxedTuples, RankNTypes, GADTs #-}
#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif
{-# OPTIONS -Wall #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.Async
-- Copyright   :  (c) Simon Marlow 2012
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Simon Marlow <marlowsd@gmail.com>
-- Stability   :  provisional
-- Portability :  non-portable (requires concurrency)
--
-- This module provides a set of operations for running IO operations
-- asynchronously and waiting for their results.  It is a thin layer
-- over the basic concurrency operations provided by
-- "Control.Concurrent".  The main additional functionality it
-- provides is the ability to wait for the return value of a thread,
-- but the interface also provides some additional safety and
-- robustness over using threads and @MVar@ directly.
--
-- The basic type is @'Async' a@, which represents an asynchronous
-- @IO@ action that will return a value of type @a@, or die with an
-- exception.  An @Async@ corresponds to a thread, and its 'ThreadId'
-- can be obtained with 'asyncThreadId', although that should rarely
-- be necessary.
--
-- For example, to fetch two web pages at the same time, we could do
-- this (assuming a suitable @getURL@ function):
--
-- >    do a1 <- async (getURL url1)
-- >       a2 <- async (getURL url2)
-- >       page1 <- wait a1
-- >       page2 <- wait a2
-- >       ...
--
-- where 'async' starts the operation in a separate thread, and
-- 'wait' waits for and returns the result.  If the operation
-- throws an exception, then that exception is re-thrown by
-- 'wait'.  This is one of the ways in which this library
-- provides some additional safety: it is harder to accidentally
-- forget about exceptions thrown in child threads.
--
-- A slight improvement over the previous example is this:
--
-- >       withAsync (getURL url1) $ \a1 -> do
-- >       withAsync (getURL url2) $ \a2 -> do
-- >       page1 <- wait a1
-- >       page2 <- wait a2
-- >       ...
--
-- 'withAsync' is like 'async', except that the 'Async' is
-- automatically killed (using 'cancel') if the enclosing IO operation
-- returns before it has completed.  Consider the case when the first
-- 'wait' throws an exception; then the second 'Async' will be
-- automatically killed rather than being left to run in the
-- background, possibly indefinitely.  This is the second way that the
-- library provides additional safety: using 'withAsync' means we can
-- avoid accidentally leaving threads running.  Furthermore,
-- 'withAsync' allows a tree of threads to be built, such that
-- children are automatically killed if their parents die for any
-- reason.
--
-- The pattern of performing two IO actions concurrently and waiting
-- for their results is packaged up in a combinator 'concurrently', so
-- we can further shorten the above example to:
--
-- >       (page1, page2) <- concurrently (getURL url1) (getURL url2)
-- >       ...
--
-- The 'Functor' instance can be used to change the result of an
-- 'Async'.  For example:
--
-- > ghci> a <- async (return 3)
-- > ghci> wait a
-- > 3
-- > ghci> wait (fmap (+1) a)
-- > 4

-----------------------------------------------------------------------------

module Control.Concurrent.Async.Pool.Async
    ( module Control.Concurrent.Async.Pool.Async
    , module Gr
    ) where

import Control.Concurrent.STM
import Control.Exception
import Control.Concurrent
import Control.Applicative
import Control.Monad hiding (forM, forM_, mapM, mapM_)
import Data.Foldable
import Data.Graph.Inductive.Graph as Gr hiding ((&))
import Data.Graph.Inductive.PatriciaTree as Gr
import Data.Graph.Inductive.Query.BFS as Gr
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import Data.Traversable
import Prelude hiding (mapM_, mapM, foldr, all, any, concatMap, foldl1)

import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc

-- | A 'Handle' is a unique identifier for a task submitted to a 'Pool'.
type Handle    = Node
data State     = Ready | Starting | Started ThreadId SomeTMVar
data Status    = Pending | Completed deriving (Status -> Status -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Status -> Status -> Bool
$c/= :: Status -> Status -> Bool
== :: Status -> Status -> Bool
$c== :: Status -> Status -> Bool
Eq, Int -> Status -> ShowS
[Status] -> ShowS
Status -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Status] -> ShowS
$cshowList :: [Status] -> ShowS
show :: Status -> String
$cshow :: Status -> String
showsPrec :: Int -> Status -> ShowS
$cshowsPrec :: Int -> Status -> ShowS
Show)
type TaskGraph = Gr (TVar State) Status

instance Eq State where
    State
Ready        == :: State -> State -> Bool
== State
Ready        = Bool
True
    State
Starting     == State
Starting     = Bool
True
    Started ThreadId
n1 SomeTMVar
_ == Started ThreadId
n2 SomeTMVar
_ = ThreadId
n1 forall a. Eq a => a -> a -> Bool
== ThreadId
n2
    State
_            == State
_            = Bool
False

instance Show State where
    show :: State -> String
show State
Ready         = String
"Ready"
    show State
Starting      = String
"Starting"
    show (Started ThreadId
n SomeTMVar
_) = String
"Started " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show ThreadId
n

-- | A 'Pool' manages a collection of possibly interdependent tasks, such that
--   tasks await execution until the tasks they depend on have finished (and
--   tasks may depend on an arbitrary number of other tasks), while
--   independent tasks execute concurrently up to the number of available
--   resource slots in the pool.
--
--   Results from each task are available until the status of the task is
--   polled or waited on.  Further, the results are kept until that occurs, so
--   failing to ever wait will result in a memory leak.
--
--   Tasks may be cancelled, in which case all dependent tasks are
--   unscheduled.
data Pool = Pool
    { Pool -> TVar TaskGraph
tasks :: TVar TaskGraph
      -- ^ The task graph represents a partially ordered set P with subset S
      --   such that for every x ∈ S and y ∈ P, either x ≤ y or x is unrelated
      --   to y.  Stated more simply, S is the set of least elements of all
      --   maximal chains in P.  In our case, ≤ relates two uncompleted tasks
      --   by dependency.  Therefore, S is equal to the set of tasks which may
      --   execute concurrently, as none of them have incomplete dependencies.
      --
      --   We use a graph representation to make determination of S more
      --   efficient (where S is just the set of roots in P expressed as a
      --   graph).  Completion status is recorded on the edges, and nodes are
      --   removed from the graph once no other incomplete node depends on
      --   them.
    , Pool -> TVar Int
tokens :: TVar Int
      -- ^ Tokens identify tasks, and are provisioned monotonically.
    }

waitTMVar :: TMVar a -> STM ()
waitTMVar :: forall a. TMVar a -> STM ()
waitTMVar TMVar a
tv = do
    a
_ <- forall a. TMVar a -> STM a
readTMVar TMVar a
tv
    forall (m :: * -> *) a. Monad m => a -> m a
return ()

syncPool :: Pool -> STM ()
syncPool :: Pool -> STM ()
syncPool Pool
p = do
    TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> [LNode a]
labNodes TaskGraph
g) forall a b. (a -> b) -> a -> b
$ \(Int
_h, TVar State
st) -> do
        State
x <- forall a. TVar a -> STM a
readTVar TVar State
st
        case State
x of
            Started ThreadId
_tid SomeTMVar
v -> SomeTMVar -> STM ()
waitSomeTMVar SomeTMVar
v
            State
_ -> forall a. STM a
retry

data SomeTMVar where
    SomeTMVar :: forall a. TMVar a -> SomeTMVar

waitSomeTMVar :: SomeTMVar -> STM ()
waitSomeTMVar :: SomeTMVar -> STM ()
waitSomeTMVar (SomeTMVar TMVar a
mv) = forall a. TMVar a -> STM ()
waitTMVar TMVar a
mv

data TaskGroup = TaskGroup
    { TaskGroup -> Pool
pool    :: Pool
    , TaskGroup -> TVar Int
avail   :: TVar Int
      -- ^ The number of available execution slots in the pool.
    , TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending :: TVar (IntMap (IO ThreadId, SomeTMVar))
      -- ^ Nodes in the task graph that are waiting to start.
    }

-- -----------------------------------------------------------------------------
-- STM Async API


-- | An asynchronous action spawned by 'async' or 'withAsync'.
-- Asynchronous actions are executed in a separate thread, and
-- operations are provided for waiting for asynchronous actions to
-- complete and obtaining their results (see e.g. 'wait').
--
data Async a = Async
    { forall a. Async a -> TaskGroup
taskGroup  :: TaskGroup
    , forall a. Async a -> Int
taskHandle :: {-# UNPACK #-} !Handle
    , forall a. Async a -> STM (Either SomeException a)
_asyncWait :: STM (Either SomeException a)
    }

getTaskVar :: TaskGraph -> Handle -> TVar State
getTaskVar :: TaskGraph -> Int -> TVar State
getTaskVar TaskGraph
g Int
h = let (Adj Status
_to, Int
_, TVar State
t, Adj Status
_from) = forall (gr :: * -> * -> *) a b.
Graph gr =>
gr a b -> Int -> Context a b
context TaskGraph
g Int
h in TVar State
t

getThreadId :: TaskGraph -> Node -> STM (Maybe ThreadId)
getThreadId :: TaskGraph -> Int -> STM (Maybe ThreadId)
getThreadId TaskGraph
g Int
h = do
    State
status <- forall a. TVar a -> STM a
readTVar (TaskGraph -> Int -> TVar State
getTaskVar TaskGraph
g Int
h)
    case State
status of
        State
Ready       -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        State
Starting    -> forall a. STM a
retry
        Started ThreadId
x SomeTMVar
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ThreadId
x

instance Eq (Async a) where
  Async TaskGroup
_ Int
a STM (Either SomeException a)
_ == :: Async a -> Async a -> Bool
== Async TaskGroup
_ Int
b STM (Either SomeException a)
_  =  Int
a forall a. Eq a => a -> a -> Bool
== Int
b

instance Ord (Async a) where
  Async TaskGroup
_ Int
a STM (Either SomeException a)
_ compare :: Async a -> Async a -> Ordering
`compare` Async TaskGroup
_ Int
b STM (Either SomeException a)
_  =  Int
a forall a. Ord a => a -> a -> Ordering
`compare` Int
b

instance Functor Async where
  fmap :: forall a b. (a -> b) -> Async a -> Async b
fmap a -> b
f (Async TaskGroup
p Int
a STM (Either SomeException a)
w) = forall a.
TaskGroup -> Int -> STM (Either SomeException a) -> Async a
Async TaskGroup
p Int
a (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) STM (Either SomeException a)
w)


-- | Spawn an asynchronous action in a separate thread.
async :: TaskGroup -> IO a -> IO (Async a)
async :: forall a. TaskGroup -> IO a -> IO (Async a)
async TaskGroup
p = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> a
inline forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO

-- | Like 'async' but using 'forkOS' internally.
asyncBound :: TaskGroup -> IO a -> IO (Async a)
asyncBound :: forall a. TaskGroup -> IO a -> IO (Async a)
asyncBound TaskGroup
p = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
forkOS

-- | Like 'async' but using 'forkOn' internally.
asyncOn :: TaskGroup -> Int -> IO a -> IO (Async a)
asyncOn :: forall a. TaskGroup -> Int -> IO a -> IO (Async a)
asyncOn TaskGroup
p = (forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
.) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn

-- | Like 'async' but using 'forkIOWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions.
asyncWithUnmask :: TaskGroup -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask :: forall a.
TaskGroup -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask TaskGroup
p (forall b. IO b -> IO b) -> IO a
actionWith =
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions.
asyncOnWithUnmask :: TaskGroup -> Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask :: forall a.
TaskGroup
-> Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask TaskGroup
p Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

asyncUsing :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing :: forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
doFork IO a
action = do
    Int
h <- Pool -> STM Int
nextIdent (TaskGroup -> Pool
pool TaskGroup
p)

    TMVar (Either SomeException a)
var <- forall a. STM (TMVar a)
newEmptyTMVar
    let start :: IO ThreadId
start = forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
            IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> IO (Either e a)
try (forall b. IO b -> IO b
restore (IO a
action forall a b. IO a -> IO b -> IO a
`finally` Int -> IO ()
cleanup Int
h))
                forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var

    forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p) (forall a. Int -> a -> IntMap a -> IntMap a
IntMap.insert Int
h (IO ThreadId
start, forall a. TMVar a -> SomeTMVar
SomeTMVar TMVar (Either SomeException a)
var))
    TVar State
tv <- forall a. a -> STM (TVar a)
newTVar State
Ready
    forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks (TaskGroup -> Pool
pool TaskGroup
p)) (forall (gr :: * -> * -> *) a b.
DynGraph gr =>
LNode a -> gr a b -> gr a b
insNode (Int
h, TVar State
tv))

    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> Int -> STM (Either SomeException a) -> Async a
Async TaskGroup
p Int
h (forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
  where
    cleanup :: Int -> IO ()
cleanup Int
h = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (TaskGroup -> TVar Int
avail TaskGroup
p) forall a. Enum a => a -> a
succ
        Pool -> Int -> STM ()
cleanupTask (TaskGroup -> Pool
pool TaskGroup
p) Int
h

-- | Like 'asyncUsing' but waits until there are free slots in the TaskGroup
asyncUsingLazy :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsingLazy :: forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsingLazy TaskGroup
p IO () -> IO ThreadId
doFork IO a
action = do
    Int
availSlots <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar Int
avail TaskGroup
p)
    Bool -> STM ()
check (Int
availSlots forall a. Ord a => a -> a -> Bool
> Int
0)
    forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
doFork IO a
action

-- | Return the next available thread identifier from the pool.  These are
--   monotonically increasing integers.
nextIdent :: Pool -> STM Int
nextIdent :: Pool -> STM Int
nextIdent Pool
p = do
    Int
tok <- forall a. TVar a -> STM a
readTVar (Pool -> TVar Int
tokens Pool
p)
    forall a. TVar a -> a -> STM ()
writeTVar (Pool -> TVar Int
tokens Pool
p) (forall a. Enum a => a -> a
succ Int
tok)
    forall (m :: * -> *) a. Monad m => a -> m a
return Int
tok

cleanupTask :: Pool -> Handle -> STM ()
cleanupTask :: Pool -> Int -> STM ()
cleanupTask Pool
p Int
h =
    -- Once the task is done executing, we must alter the graph so any
    -- dependent children will know their parent has completed.
    forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks Pool
p) forall a b. (a -> b) -> a -> b
$ \TaskGraph
g ->
        case forall a b. [a] -> [b] -> [(a, b)]
zip (forall a. a -> [a]
repeat Int
h) (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> [Int]
Gr.suc TaskGraph
g Int
h) of
            -- If nothing dependend on this task and if the final result value
            -- has been observed, prune it from the graph, as well as any
            -- parents which now have no dependents.  Otherwise mark the edges
            -- as Completed so dependent children can execute.
            [] -> forall {gr :: * -> * -> *} {a} {b}.
Graph gr =>
Int -> gr a b -> gr a b
dropTask Int
h TaskGraph
g
            [(Int, Int)]
es -> forall (gr :: * -> * -> *) b a.
DynGraph gr =>
[LEdge b] -> gr a b -> gr a b
insEdges (forall {a} {b}. [(a, b)] -> [(a, b, Status)]
completeEdges [(Int, Int)]
es) forall a b. (a -> b) -> a -> b
$ forall (gr :: * -> * -> *) a b.
DynGraph gr =>
[(Int, Int)] -> gr a b -> gr a b
delEdges [(Int, Int)]
es TaskGraph
g
  where
    completeEdges :: [(a, b)] -> [(a, b, Status)]
completeEdges = forall a b. (a -> b) -> [a] -> [b]
map (\(a
f, b
t) -> (a
f, b
t, Status
Completed))

    dropTask :: Int -> gr a b -> gr a b
dropTask Int
k gr a b
gr = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' gr a b -> Int -> gr a b
f (forall {gr :: * -> * -> *} {a} {b}.
Graph gr =>
Int -> gr a b -> gr a b
delNode Int
k gr a b
gr) (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> [Int]
Gr.pre gr a b
gr Int
k)
      where
        f :: gr a b -> Int -> gr a b
f gr a b
g Int
n = if forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> Int
outdeg gr a b
g Int
n forall a. Eq a => a -> a -> Bool
== Int
0 then Int -> gr a b -> gr a b
dropTask Int
n gr a b
g else gr a b
g

-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function.  When the function returns
-- or throws an exception, 'cancel' is called on the @Async@.
--
-- > withAsync action inner = bracket (async action) cancel inner
--
-- This is a useful variant of 'async' that ensures an @Async@ is
-- never left running unintentionally.
--
-- Since 'cancel' may block, 'withAsync' may also block; see 'cancel'
-- for details.
--
withAsync :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync :: forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p = forall a. a -> a
inline forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO

-- | Like 'withAsync' but uses 'forkOS' internally.
withAsyncBound :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsyncBound :: forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsyncBound TaskGroup
p = forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
forkOS

-- | Like 'withAsync' but uses 'forkOn' internally.
withAsyncOn :: TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn :: forall a b. TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn TaskGroup
p = forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn

-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions.
withAsyncWithUnmask :: TaskGroup -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask :: forall a b.
TaskGroup
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask TaskGroup
p (forall b. IO b -> IO b) -> IO a
actionWith =
    forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions
withAsyncOnWithUnmask :: TaskGroup -> Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask :: forall a b.
TaskGroup
-> Int
-> ((forall b. IO b -> IO b) -> IO a)
-> (Async a -> IO b)
-> IO b
withAsyncOnWithUnmask TaskGroup
p Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith = forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

withAsyncUsing :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b)
               -> IO b
-- The bracket version works, but is slow.  We can do better by
-- hand-coding it:
withAsyncUsing :: forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
  forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
    Async a
a <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
restore IO a
action
    b
r <- forall b. IO b -> IO b
restore (Async a -> IO b
inner Async a
a) forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do forall a. Async a -> IO ()
cancel Async a
a; forall e a. Exception e => e -> IO a
throwIO SomeException
e
    forall a. Async a -> IO ()
cancel Async a
a
    forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- | Wait for an asynchronous action to complete, and return its
-- value.  If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: forall a. Async a -> IO a
wait = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM a
waitSTM

-- | Wait for an asynchronous action to complete, and return either
-- @Left e@ if the action raised an exception @e@, or @Right a@ if it
-- returned a value @a@.
--
-- > waitCatch = atomically . waitCatchSTM
--
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: forall a. Async a -> IO (Either SomeException a)
waitCatch = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM (Either SomeException a)
waitCatchSTM

-- | Check whether an 'Async' has completed yet.  If it has not
-- completed yet, then the result is @Nothing@, otherwise the result
-- is @Just e@ where @e@ is @Left x@ if the @Async@ raised an
-- exception @x@, or @Right a@ if it returned a value @a@.
--
-- > poll = atomically . pollSTM
--
{-# INLINE poll #-}
poll :: Async a -> IO (Maybe (Either SomeException a))
poll :: forall a. Async a -> IO (Maybe (Either SomeException a))
poll = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM a
waitSTM :: forall a. Async a -> STM a
waitSTM Async a
a = do
   Either SomeException a
r <- forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
   forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall e a. Exception e => e -> STM a
throwSTM forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: forall a. Async a -> STM (Either SomeException a)
waitCatchSTM (Async TaskGroup
_ Int
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w

-- | A version of 'poll' that can be used inside an STM transaction.
--
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM :: forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM (Async TaskGroup
_ Int
_ STM (Either SomeException a)
w) = (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either SomeException a)
w) forall a. STM a -> STM a -> STM a
`orElse` forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

-- | Cancel an asynchronous action by throwing the @ThreadKilled@
-- exception to it.  Has no effect if the 'Async' has already
-- completed.
--
-- > cancel a = throwTo (asyncThreadId a) ThreadKilled
--
-- Note that 'cancel' is synchronous in the same sense as 'throwTo'.
-- It does not return until the exception has been thrown in the
-- target thread, or the target thread has completed.  In particular,
-- if the target thread is making a foreign call, the exception will
-- not be thrown until the foreign call returns, and in this case
-- 'cancel' may block indefinitely.  An asynchronous 'cancel' can
-- of course be obtained by wrapping 'cancel' itself in 'async'.
--
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: forall a. Async a -> IO ()
cancel = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall e a. Exception e => Async a -> e -> IO ()
cancelWith AsyncException
ThreadKilled

-- | Cancel an asynchronous action by throwing the supplied exception
-- to it.
--
-- > cancelWith a x = throwTo (asyncThreadId a) x
--
-- The notes about the synchronous nature of 'cancel' also apply to
-- 'cancelWith'.
cancelWith' :: Exception e => Pool -> Handle -> e -> IO ()
cancelWith' :: forall e. Exception e => Pool -> Int -> e -> IO ()
cancelWith' Pool
p Int
h e
e =
    (forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` e
e) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
        let hs :: [Int]
hs = if forall (gr :: * -> * -> *) a b. Graph gr => Int -> gr a b -> Bool
gelem Int
h TaskGraph
g then TaskGraph -> Int -> [Int]
nodeList TaskGraph
g Int
h else []
        [ThreadId]
xs <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (TaskGraph -> [ThreadId] -> Int -> STM [ThreadId]
go TaskGraph
g) [] [Int]
hs
        forall a. TVar a -> a -> STM ()
writeTVar (Pool -> TVar TaskGraph
tasks Pool
p) forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall {gr :: * -> * -> *} {a} {b}.
Graph gr =>
Int -> gr a b -> gr a b
delNode) TaskGraph
g [Int]
hs
        forall (m :: * -> *) a. Monad m => a -> m a
return [ThreadId]
xs
  where
    go :: TaskGraph -> [ThreadId] -> Int -> STM [ThreadId]
go TaskGraph
g [ThreadId]
acc Int
h' = forall b a. b -> (a -> b) -> Maybe a -> b
maybe [ThreadId]
acc (forall a. a -> [a] -> [a]
:[ThreadId]
acc) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaskGraph -> Int -> STM (Maybe ThreadId)
getThreadId TaskGraph
g Int
h'

    nodeList :: TaskGraph -> Node -> [Node]
    nodeList :: TaskGraph -> Int -> [Int]
nodeList TaskGraph
g Int
k = Int
k forall a. a -> [a] -> [a]
: forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (TaskGraph -> Int -> [Int]
nodeList TaskGraph
g) (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> [Int]
Gr.suc TaskGraph
g Int
k)

cancelWith :: Exception e => Async a -> e -> IO ()
cancelWith :: forall e a. Exception e => Async a -> e -> IO ()
cancelWith (Async TaskGroup
p Int
h STM (Either SomeException a)
_) = forall e. Exception e => Pool -> Int -> e -> IO ()
cancelWith' (TaskGroup -> Pool
pool TaskGroup
p) Int
h

-- | Cancel an asynchronous action by throwing the @ThreadKilled@ exception to
--   it, or unregistering it from the task pool if it had not started yet.  Has
--   no effect if the 'Async' has already completed.
--
-- Note that 'cancel' is synchronous in the same sense as 'throwTo'.  It does
-- not return until the exception has been thrown in the target thread, or the
-- target thread has completed.  In particular, if the target thread is making
-- a foreign call, the exception will not be thrown until the foreign call
-- returns, and in this case 'cancel' may block indefinitely.  An asynchronous
-- 'cancel' can of course be obtained by wrapping 'cancel' itself in 'async'.
cancelAll :: TaskGroup -> IO ()
cancelAll :: TaskGroup -> IO ()
cancelAll TaskGroup
p = do
    [Int]
hs <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        forall a. TVar a -> a -> STM ()
writeTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p) forall a. IntMap a
IntMap.empty
        TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks (TaskGroup -> Pool
pool TaskGroup
p))
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> [Int]
nodes TaskGraph
g
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Int
h -> forall e. Exception e => Pool -> Int -> e -> IO ()
cancelWith' (TaskGroup -> Pool
pool TaskGroup
p) Int
h AsyncException
ThreadKilled) [Int]
hs

-- | Wait for any of the supplied asynchronous operations to complete.
-- The value returned is a pair of the 'Async' that completed, and the
-- result that would be returned by 'wait' on that 'Async'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch :: forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async a]
asyncs =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
    forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall a. STM a -> STM a -> STM a
orElse forall a. STM a
retry forall a b. (a -> b) -> a -> b
$
      forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do Either SomeException a
r <- forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a; forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, Either SomeException a
r)) [Async a]
asyncs

-- | Like 'waitAnyCatch', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel :: forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async a]
asyncs =
  forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async a]
asyncs forall a b. IO a -> IO b -> IO a
`finally` forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. Async a -> IO ()
cancel [Async a]
asyncs

-- | Wait for any of the supplied @Async@s to complete.  If the first
-- to complete throws an exception, then that exception is re-thrown
-- by 'waitAny'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
waitAny :: [Async a] -> IO (Async a, a)
waitAny :: forall a. [Async a] -> IO (Async a, a)
waitAny [Async a]
asyncs =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
    forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall a. STM a -> STM a -> STM a
orElse forall a. STM a
retry forall a b. (a -> b) -> a -> b
$
      forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do a
r <- forall a. Async a -> STM a
waitSTM Async a
a; forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, a
r)) [Async a]
asyncs

-- | Like 'waitAny', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel :: forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async a]
asyncs =
  forall a. [Async a] -> IO (Async a, a)
waitAny [Async a]
asyncs forall a b. IO a -> IO b -> IO a
`finally` forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. Async a -> IO ()
cancel [Async a]
asyncs

-- | Wait for the first of two @Async@s to finish.
waitEitherCatch :: Async a -> Async b
                -> IO (Either (Either SomeException a)
                              (Either SomeException b))
waitEitherCatch :: forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
    (forall a b. a -> Either a b
Left  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
left)
      forall a. STM a -> STM a -> STM a
`orElse`
    (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async b
right)

-- | Like 'waitEitherCatch', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCatchCancel :: Async a -> Async b
                      -> IO (Either (Either SomeException a)
                                    (Either SomeException b))
waitEitherCatchCancel :: forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel Async a
left Async b
right =
  forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right forall a b. IO a -> IO b -> IO a
`finally` (forall a. Async a -> IO ()
cancel Async a
left forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async b
right)

-- | Wait for the first of two @Async@s to finish.  If the @Async@
-- that finished first raised an exception, then the exception is
-- re-thrown by 'waitEither'.
--
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither :: forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
    (forall a b. a -> Either a b
Left  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM a
waitSTM Async a
left)
      forall a. STM a -> STM a -> STM a
`orElse`
    (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM a
waitSTM Async b
right)

-- | Like 'waitEither', but the result is ignored.
--
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ :: forall a b. Async a -> Async b -> IO ()
waitEither_ Async a
left Async b
right =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
    (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> STM a
waitSTM Async a
left)
      forall a. STM a -> STM a -> STM a
`orElse`
    (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> STM a
waitSTM Async b
right)

-- | Like 'waitEither', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel :: forall a b. Async a -> Async b -> IO (Either a b)
waitEitherCancel Async a
left Async b
right =
  forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right forall a b. IO a -> IO b -> IO a
`finally` (forall a. Async a -> IO ()
cancel Async a
left forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async b
right)

-- | Waits for both @Async@s to finish, but if either of them throws
-- an exception before they have both finished, then the exception is
-- re-thrown by 'waitBoth'.
--
waitBoth :: Async a -> Async b -> IO (a,b)
waitBoth :: forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async a
left Async b
right =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    a
a <- forall a. Async a -> STM a
waitSTM Async a
left
           forall a. STM a -> STM a -> STM a
`orElse`
         (forall a. Async a -> STM a
waitSTM Async b
right forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. STM a
retry)
    b
b <- forall a. Async a -> STM a
waitSTM Async b
right
    forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)


-- | Link the given @Async@ to the current thread, such that if the
-- @Async@ raises an exception, that exception will be re-thrown in
-- the current thread.
--
link :: Async a -> IO ()
link :: forall a. Async a -> IO ()
link (Async TaskGroup
_ Int
_ STM (Either SomeException a)
w) = do
  ThreadId
me <- IO ThreadId
myThreadId
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO ThreadId
forkRepeat forall a b. (a -> b) -> a -> b
$ do
     Either SomeException a
r <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ STM (Either SomeException a)
w
     case Either SomeException a
r of
       Left SomeException
e -> forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
me SomeException
e
       Either SomeException a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Link two @Async@s together, such that if either raises an
-- exception, the same exception is re-thrown in the other @Async@.
--
link2 :: Async a -> Async b -> IO ()
link2 :: forall a b. Async a -> Async b -> IO ()
link2 Async a
left Async b
right =
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO ThreadId
forkRepeat forall a b. (a -> b) -> a -> b
$ do
    Either (Either SomeException a) (Either SomeException b)
r <- forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right
    case Either (Either SomeException a) (Either SomeException b)
r of
      Left  (Left SomeException
e) -> forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async b
right SomeException
e
      Right (Left SomeException
e) -> forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async a
left SomeException
e
      Either (Either SomeException a) (Either SomeException b)
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()


-- -----------------------------------------------------------------------------

-- | Run two @IO@ actions concurrently, and return the first to
-- finish.  The loser of the race is 'cancel'led.
--
-- > race left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitEither a b
--
race :: TaskGroup -> IO a -> IO b -> IO (Either a b)

-- | Like 'race', but the result is ignored.
--
race_ :: TaskGroup -> IO a -> IO b -> IO ()

-- | Run two @IO@ actions concurrently, and return both results.  If
-- either action throws an exception at any time, then the other
-- action is 'cancel'led, and the exception is re-thrown by
-- 'concurrently'.
--
-- > concurrently left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitBoth a b
concurrently :: TaskGroup -> IO a -> IO b -> IO (a,b)

#define USE_ASYNC_VERSIONS 1

#if USE_ASYNC_VERSIONS

race :: forall a b. TaskGroup -> IO a -> IO b -> IO (Either a b)
race TaskGroup
p IO a
left IO b
right =
  forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
  forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
  forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
a Async b
b

race_ :: forall a b. TaskGroup -> IO a -> IO b -> IO ()
race_ TaskGroup
p IO a
left IO b
right =
  forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
  forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
  forall a b. Async a -> Async b -> IO ()
waitEither_ Async a
a Async b
b

concurrently :: forall a b. TaskGroup -> IO a -> IO b -> IO (a, b)
concurrently TaskGroup
p IO a
left IO b
right =
  forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
  forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
  forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async a
a Async b
b

#else

-- MVar versions of race/concurrently
-- More ugly than the Async versions, but quite a bit faster.

-- race :: IO a -> IO b -> IO (Either a b)
race left right = concurrently' left right collect
  where
    collect m = do
        e <- takeMVar m
        case e of
            Left ex -> throwIO ex
            Right r -> return r

-- race_ :: IO a -> IO b -> IO ()
race_ left right = void $ race left right

-- concurrently :: IO a -> IO b -> IO (a,b)
concurrently left right = concurrently' left right (collect [])
  where
    collect [Left a, Right b] _ = return (a,b)
    collect [Right b, Left a] _ = return (a,b)
    collect xs m = do
        e <- takeMVar m
        case e of
            Left ex -> throwIO ex
            Right r -> collect (r:xs) m

concurrently' :: IO a -> IO b
             -> (MVar (Either SomeException (Either a b)) -> IO r)
             -> IO r
concurrently' left right collect = do
    done <- newEmptyMVar
    mask $ \restore -> do
        lid <- forkIO $ restore (left >>= putMVar done . Right . Left)
                             `catchAll` (putMVar done . Left)
        rid <- forkIO $ restore (right >>= putMVar done . Right . Right)
                             `catchAll` (putMVar done . Left)
        let stop = killThread lid >> killThread rid
        r <- restore (collect done) `onException` stop
        stop
        return r

#endif

-- -----------------------------------------------------------------------------

-- | A value of type @Concurrently a@ is an @IO@ operation that can be
-- composed with other @Concurrently@ values, using the @Applicative@
-- and @Alternative@ instances.
--
-- Calling @runConcurrently@ on a value of type @Concurrently a@ will
-- execute the @IO@ operations it contains concurrently, before
-- delivering the result of type @a@.
--
-- For example
--
-- > (page1, page2, page3)
-- >     <- runConcurrently $ (,,)
-- >     <$> Concurrently (getURL "url1")
-- >     <*> Concurrently (getURL "url2")
-- >     <*> Concurrently (getURL "url3")
--
newtype Concurrently a = Concurrently { forall a. Concurrently a -> TaskGroup -> IO a
runConcurrently :: TaskGroup -> IO a }

instance Functor Concurrently where
  fmap :: forall a b. (a -> b) -> Concurrently a -> Concurrently b
fmap a -> b
f (Concurrently TaskGroup -> IO a
a) = forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaskGroup -> IO a
a

instance Applicative Concurrently where
  pure :: forall a. a -> Concurrently a
pure a
x = forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return a
x
  Concurrently TaskGroup -> IO (a -> b)
fs <*> :: forall a b.
Concurrently (a -> b) -> Concurrently a -> Concurrently b
<*> Concurrently TaskGroup -> IO a
as =
    forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> (\(a -> b
f, a
a) -> a -> b
f a
a) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. TaskGroup -> IO a -> IO b -> IO (a, b)
concurrently TaskGroup
tg (TaskGroup -> IO (a -> b)
fs TaskGroup
tg) (TaskGroup -> IO a
as TaskGroup
tg)

instance Alternative Concurrently where
  empty :: forall a. Concurrently a
empty = forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound)
  Concurrently TaskGroup -> IO a
as <|> :: forall a. Concurrently a -> Concurrently a -> Concurrently a
<|> Concurrently TaskGroup -> IO a
bs =
    forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. TaskGroup -> IO a -> IO b -> IO (Either a b)
race TaskGroup
tg (TaskGroup -> IO a
as TaskGroup
tg) (TaskGroup -> IO a
bs TaskGroup
tg)

-- ----------------------------------------------------------------------------

-- | Fork a thread that runs the supplied action, and if it raises an
-- exception, re-runs the action.  The thread terminates only when the
-- action runs to completion without raising an exception.
forkRepeat :: IO a -> IO ThreadId
forkRepeat :: forall a. IO a -> IO ThreadId
forkRepeat IO a
action =
  forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
    let go :: IO ()
go = do Either SomeException a
r <- forall a. IO a -> IO (Either SomeException a)
tryAll (forall b. IO b -> IO b
restore IO a
action)
                case Either SomeException a
r of
                  Left SomeException
_ -> IO ()
go
                  Either SomeException a
_      -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    in IO () -> IO ThreadId
forkIO IO ()
go

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: forall a. IO a -> (SomeException -> IO a) -> IO a
catchAll = forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch

tryAll :: IO a -> IO (Either SomeException a)
tryAll :: forall a. IO a -> IO (Either SomeException a)
tryAll = forall e a. Exception e => IO a -> IO (Either e a)
try

-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
#if MIN_VERSION_base(4,17,0)
rawForkIO (IO action) = IO $ \ s ->
#else
rawForkIO :: IO () -> IO ThreadId
rawForkIO IO ()
action = forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
#endif
   case (forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# IO ()
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)

{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
#if MIN_VERSION_base(4,17,0)
rawForkOn (I# cpu) (IO action) = IO $ \ s ->
#else
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# Int#
cpu) IO ()
action = forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
#endif
   case (forall a.
Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forkOn# Int#
cpu IO ()
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)