Copyright | (c) Simon Marlow 2012 John Wiegley 2014 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | John Wiegley <johnw@newartisans.com> |
Stability | provisional |
Portability | non-portable (requires concurrency) |
Safe Haskell | Safe-Inferred |
Language | Haskell98 |
This module provides a set of operations for running IO operations
asynchronously and waiting for their results. It is a thin layer over the
basic concurrency operations provided by Control.Concurrent. The main
additional functionality it provides is the ability to wait for the return
value of a thread, plus functions for managing task pools, work groups, and
many-to-many dependencies between tasks. The interface also provides some
additional safety and robustness over using threads and MVar
directly.
The basic type is
, which represents an asynchronous Async
aIO
action
that will return a value of type a
, or die with an exception. An Async
corresponds to either a thread, or a Handle
to an action waiting to be
spawned. This makes it possible to submit very large numbers of tasks,
with only N threads active at one time.
For example, to fetch two web pages at the same time, we could do
this (assuming a suitable getURL
function):
withTaskGroup 4 $ \g -> do a1 <- async g (getURL url1) a2 <- async g (getURL url2) page1 <- wait a1 page2 <- wait a2 ...
where async
submits the operation to the worker group (and from which it
is spawned in a separate thread), and wait
waits for and returns the
result. The number 4 indicates the maximum number of threads which may be
spawned at one time. If the operation throws an exception, then that
exception is re-thrown by wait
. This is one of the ways in which this
library provides some additional safety: it is harder to accidentally
forget about exceptions thrown in child threads.
A slight improvement over the previous example is this:
withTaskGroup 4 $ \g -> do withAsync g (getURL url1) $ \a1 -> do withAsync g (getURL url2) $ \a2 -> do page1 <- wait a1 page2 <- wait a2 ...
withAsync
is like async
, except that the Async
is automatically
killed (or unscheduled, using cancel
) if the enclosing IO operation
returns before it has completed. Consider the case when the first wait
throws an exception; then the second Async
will be automatically killed
rather than being left to run in the background, possibly indefinitely.
This is the second way that the library provides additional safety: using
withAsync
means we can avoid accidentally leaving threads running.
Furthermore, withAsync
allows a tree of threads to be built, such that
children are automatically killed if their parents die for any reason.
The pattern of performing two IO actions concurrently and waiting for their
results is packaged up in a combinator concurrently
, so we can further
shorten the above example to:
withTaskGroup 4 $ \g -> do (page1, page2) <- concurrently g (getURL url1) (getURL url2) ...
The Functor
instance can be used to change the result of an Async
. For
example:
ghci> a <- async g (return 3) ghci> wait a 3 ghci> wait (fmap (+1) a) 4
Synopsis
- data Async a
- withTaskGroup :: Int -> (TaskGroup -> IO b) -> IO b
- withTaskGroupIn :: Pool -> Int -> (TaskGroup -> IO b) -> IO b
- data Pool
- createPool :: IO Pool
- data TaskGroup
- createTaskGroup :: Pool -> Int -> IO TaskGroup
- runTaskGroup :: TaskGroup -> IO ()
- async :: TaskGroup -> IO a -> IO (Async a)
- asyncBound :: TaskGroup -> IO a -> IO (Async a)
- asyncOn :: TaskGroup -> Int -> IO a -> IO (Async a)
- asyncWithUnmask :: TaskGroup -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
- asyncOnWithUnmask :: TaskGroup -> Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
- asyncSTM :: TaskGroup -> IO a -> STM (Async a)
- taskHandle :: Async a -> Handle
- asyncAfter :: TaskGroup -> Async b -> IO a -> IO (Async a)
- asyncAfterAll :: TaskGroup -> [Handle] -> IO a -> IO (Async a)
- makeDependent :: Pool -> Handle -> Handle -> STM ()
- unsafeMakeDependent :: Pool -> Handle -> Handle -> STM ()
- withAsync :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
- withAsyncBound :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
- withAsyncOn :: TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b
- withAsyncWithUnmask :: TaskGroup -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
- withAsyncOnWithUnmask :: TaskGroup -> Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
- wait :: Async a -> IO a
- poll :: Async a -> IO (Maybe (Either SomeException a))
- waitCatch :: Async a -> IO (Either SomeException a)
- cancel :: Async a -> IO ()
- cancelWith :: Exception e => Async a -> e -> IO ()
- cancelAll :: TaskGroup -> IO ()
- waitSTM :: Async a -> STM a
- pollSTM :: Async a -> STM (Maybe (Either SomeException a))
- waitCatchSTM :: Async a -> STM (Either SomeException a)
- waitAny :: [Async a] -> IO (Async a, a)
- waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
- waitAnyCancel :: [Async a] -> IO (Async a, a)
- waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
- waitEither :: Async a -> Async b -> IO (Either a b)
- waitEitherCatch :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b))
- waitEitherCancel :: Async a -> Async b -> IO (Either a b)
- waitEitherCatchCancel :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b))
- waitEither_ :: Async a -> Async b -> IO ()
- waitBoth :: Async a -> Async b -> IO (a, b)
- link :: Async a -> IO ()
- link2 :: Async a -> Async b -> IO ()
- mapTasks :: Traversable t => TaskGroup -> t (IO a) -> IO (t a)
- mapTasks_ :: Foldable t => TaskGroup -> t (IO a) -> IO ()
- mapTasksE :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Either SomeException a))
- mapTasksE_ :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Maybe SomeException))
- mapRace :: Foldable t => TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a)
- mapReduce :: (Foldable t, Monoid a) => TaskGroup -> t (IO a) -> STM (Async a)
- scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m) => TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b
- data Task a
- runTask :: TaskGroup -> Task a -> IO a
- task :: IO a -> Task a
- race :: TaskGroup -> IO a -> IO b -> IO (Either a b)
- race_ :: TaskGroup -> IO a -> IO b -> IO ()
- concurrently :: TaskGroup -> IO a -> IO b -> IO (a, b)
- mapConcurrently :: Traversable t => TaskGroup -> (a -> IO b) -> t a -> IO (t b)
- newtype Concurrently a = Concurrently {
- runConcurrently :: TaskGroup -> IO a
Asynchronous actions
An asynchronous action spawned by async
or withAsync
.
Asynchronous actions are executed in a separate thread, and
operations are provided for waiting for asynchronous actions to
complete and obtaining their results (see e.g. wait
).
Task pools and groups
withTaskGroup :: Int -> (TaskGroup -> IO b) -> IO b Source #
Create both a pool, and a task group with a given number of execution slots, but with a bounded lifetime. Once the given function exits, all tasks (that are still running) in the TaskGroup will be cancelled.
withTaskGroupIn :: Pool -> Int -> (TaskGroup -> IO b) -> IO b Source #
Create a task group within the given pool having a specified number of execution slots, but with a bounded lifetime. Leaving the block cancels every task still executing in the group.
A Pool
manages a collection of possibly interdependent tasks, such that
tasks await execution until the tasks they depend on have finished (and
tasks may depend on an arbitrary number of other tasks), while
independent tasks execute concurrently up to the number of available
resource slots in the pool.
Results from each task are available until the status of the task is polled or waited on. Further, the results are kept until that occurs, so failing to ever wait will result in a memory leak.
Tasks may be cancelled, in which case all dependent tasks are unscheduled.
createPool :: IO Pool Source #
Create a task pool for managing many-to-many acyclic dependencies among tasks.
createTaskGroup :: Pool -> Int -> IO TaskGroup Source #
Create a task group for executing interdependent tasks concurrently. The number of available slots governs how many tasks may run at one time.
runTaskGroup :: TaskGroup -> IO () Source #
Execute tasks in a given task group. The number of slots determines how many threads may execute concurrently.
Spawning tasks
async :: TaskGroup -> IO a -> IO (Async a) Source #
Spawn an asynchronous action in a separate thread.
asyncWithUnmask :: TaskGroup -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a) Source #
Like async
but using forkIOWithUnmask
internally.
The child thread is passed a function that can be used to unmask asynchronous exceptions.
asyncOnWithUnmask :: TaskGroup -> Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a) Source #
Like asyncOn
but using forkOnWithUnmask
internally.
The child thread is passed a function that can be used to unmask asynchronous exceptions.
asyncSTM :: TaskGroup -> IO a -> STM (Async a) Source #
Equivalent to async
, but acts in STM so that makeDependent
may be
called after the task is created, but before it begins executing.
Dependent tasks
taskHandle :: Async a -> Handle Source #
asyncAfter :: TaskGroup -> Async b -> IO a -> IO (Async a) Source #
Submit a task that begins execution only after its parent has completed.
This is equivalent to submitting a new task with asyncSTM
and linking
it to its parent using makeDependent
.
asyncAfterAll :: TaskGroup -> [Handle] -> IO a -> IO (Async a) Source #
Submit a task which begins execution after all its parents have completed.
This is equivalent to submitting a new task with asyncSTM
and linking
it to its parents using 'mapM makeDependent'.
:: Pool | |
-> Handle | Handle of task doing the waiting |
-> Handle | Handle of task we must wait on (the parent) |
-> STM () |
Given parent and child tasks, link them so the child cannot execute until the parent has finished.
:: Pool | |
-> Handle | Handle of task doing the waiting |
-> Handle | Handle of task we must wait on (the parent) |
-> STM () |
Given parent and child tasks, link them so the child cannot execute until the parent has finished. This function does not check for introduction of cycles into the dependency graph, which would prevent the child from ever running.
Spawning with automatic cancel
ation
withAsync :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b Source #
Spawn an asynchronous action in a separate thread, and pass its
Async
handle to the supplied function. When the function returns
or throws an exception, cancel
is called on the Async
.
withAsync action inner = bracket (async action) cancel inner
This is a useful variant of async
that ensures an Async
is
never left running unintentionally.
Since cancel
may block, withAsync
may also block; see cancel
for details.
withAsyncWithUnmask :: TaskGroup -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b Source #
Like withAsync
but uses forkIOWithUnmask
internally.
The child thread is passed a function that can be used to unmask asynchronous exceptions.
withAsyncOnWithUnmask :: TaskGroup -> Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b Source #
Like withAsyncOn
but uses forkOnWithUnmask
internally.
The child thread is passed a function that can be used to unmask asynchronous exceptions
Quering Async
s
wait :: Async a -> IO a Source #
Wait for an asynchronous action to complete, and return its
value. If the asynchronous action threw an exception, then the
exception is re-thrown by wait
.
wait = atomically . waitSTM
poll :: Async a -> IO (Maybe (Either SomeException a)) Source #
Check whether an Async
has completed yet. If it has not
completed yet, then the result is Nothing
, otherwise the result
is Just e
where e
is Left x
if the Async
raised an
exception x
, or Right a
if it returned a value a
.
poll = atomically . pollSTM
waitCatch :: Async a -> IO (Either SomeException a) Source #
Wait for an asynchronous action to complete, and return either
Left e
if the action raised an exception e
, or Right a
if it
returned a value a
.
waitCatch = atomically . waitCatchSTM
cancel :: Async a -> IO () Source #
Cancel an asynchronous action by throwing the ThreadKilled
exception to it. Has no effect if the Async
has already
completed.
cancel a = throwTo (asyncThreadId a) ThreadKilled
Note that cancel
is synchronous in the same sense as throwTo
.
It does not return until the exception has been thrown in the
target thread, or the target thread has completed. In particular,
if the target thread is making a foreign call, the exception will
not be thrown until the foreign call returns, and in this case
cancel
may block indefinitely. An asynchronous cancel
can
of course be obtained by wrapping cancel
itself in async
.
cancelAll :: TaskGroup -> IO () Source #
Cancel an asynchronous action by throwing the ThreadKilled
exception to
it, or unregistering it from the task pool if it had not started yet. Has
no effect if the Async
has already completed.
Note that cancel
is synchronous in the same sense as throwTo
. It does
not return until the exception has been thrown in the target thread, or the
target thread has completed. In particular, if the target thread is making
a foreign call, the exception will not be thrown until the foreign call
returns, and in this case cancel
may block indefinitely. An asynchronous
cancel
can of course be obtained by wrapping cancel
itself in async
.
STM operations
pollSTM :: Async a -> STM (Maybe (Either SomeException a)) Source #
A version of poll
that can be used inside an STM transaction.
waitCatchSTM :: Async a -> STM (Either SomeException a) Source #
A version of waitCatch
that can be used inside an STM transaction.
Waiting for multiple Async
s
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a) Source #
Wait for any of the supplied asynchronous operations to complete.
The value returned is a pair of the Async
that completed, and the
result that would be returned by wait
on that Async
.
If multiple Async
s complete or have completed, then the value
returned corresponds to the first completed Async
in the list.
waitAnyCancel :: [Async a] -> IO (Async a, a) Source #
Like waitAny
, but also cancels the other asynchronous
operations as soon as one has completed.
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a) Source #
Like waitAnyCatch
, but also cancels the other asynchronous
operations as soon as one has completed.
waitEither :: Async a -> Async b -> IO (Either a b) Source #
Wait for the first of two Async
s to finish. If the Async
that finished first raised an exception, then the exception is
re-thrown by waitEither
.
waitEitherCatch :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b)) Source #
Wait for the first of two Async
s to finish.
waitEitherCancel :: Async a -> Async b -> IO (Either a b) Source #
Like waitEither
, but also cancel
s both Async
s before
returning.
waitEitherCatchCancel :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b)) Source #
Like waitEitherCatch
, but also cancel
s both Async
s before
returning.
waitEither_ :: Async a -> Async b -> IO () Source #
Like waitEither
, but the result is ignored.
waitBoth :: Async a -> Async b -> IO (a, b) Source #
Waits for both Async
s to finish, but if either of them throws
an exception before they have both finished, then the exception is
re-thrown by waitBoth
.
Linking
link :: Async a -> IO () Source #
Link the given Async
to the current thread, such that if the
Async
raises an exception, that exception will be re-thrown in
the current thread.
link2 :: Async a -> Async b -> IO () Source #
Link two Async
s together, such that if either raises an
exception, the same exception is re-thrown in the other Async
.
Lists of actions
mapTasks :: Traversable t => TaskGroup -> t (IO a) -> IO (t a) Source #
Execute a group of tasks within the given task group, returning the results in order. The order of execution is random, but the results are returned in order.
mapTasks_ :: Foldable t => TaskGroup -> t (IO a) -> IO () Source #
Execute a group of tasks within the given task group, ignoring results.
mapTasksE :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Either SomeException a)) Source #
Execute a group of tasks within the given task group, returning the results in order as an Either type to represent exceptions from actions. The order of execution is random, but the results are returned in order.
mapTasksE_ :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Maybe SomeException)) Source #
Execute a group of tasks within the given task group, ignoring results, but returning a list of all exceptions.
mapRace :: Foldable t => TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a) Source #
Execute a group of tasks, but return the first result or failure and cancel the remaining tasks.
:: (Foldable t, Monoid a) | |
=> TaskGroup | Task group to execute the tasks within |
-> t (IO a) | Set of Monoid-yielding IO actions |
-> STM (Async a) | Returns the final result task |
Given a list of actions yielding Monoid
results, execute the actions
concurrently (up to N at a time, based on available slots), and mappend
each pair of results concurrently as they become ready. The immediate
result of this function is an Async
representing the final value.
This is similar to the following: mconcat $ mapTasks n actions
,
except that intermediate results can be garbage collected as soon as
they've been merged. Also, the value returned from this function is an
Async
which may be polled for the final result.
Lastly, if an Exception
occurs in any subtask, the final result will
also yield an exception -- but not necessarily the first or last that was
caught.
scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m) => TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b Source #
Execute a group of tasks concurrently (using up to N active threads,
depending on the task group), and feed results to a continuation as soon
as they become available, in random order. The continuation function may
return a monoid value which is accumulated to yield a final result. If
no such value is needed, simply provide ()
.
The Task Monad and Applicative
The Task
Applicative and Monad allow for task dependencies to be built
using Applicative and do notation. Monadic evaluation is sequenced,
while applicative Evaluation is concurrent for each argument. In this
way, mixing the two builds a dependency graph via ordinary Haskell code.
runTask :: TaskGroup -> Task a -> IO a Source #
Run a value in the Task
monad and block until the final result is
computed.
Other utilities
race :: TaskGroup -> IO a -> IO b -> IO (Either a b) Source #
Run two IO
actions concurrently, and return the first to
finish. The loser of the race is cancel
led.
race left right = withAsync left $ \a -> withAsync right $ \b -> waitEither a b
concurrently :: TaskGroup -> IO a -> IO b -> IO (a, b) Source #
Run two IO
actions concurrently, and return both results. If
either action throws an exception at any time, then the other
action is cancel
led, and the exception is re-thrown by
concurrently
.
concurrently left right = withAsync left $ \a -> withAsync right $ \b -> waitBoth a b
mapConcurrently :: Traversable t => TaskGroup -> (a -> IO b) -> t a -> IO (t b) Source #
maps an IO
-performing function over any Traversable
data
type, performing all the IO
actions concurrently, and returning
the original data structure with the arguments replaced by the
results.
For example, mapConcurrently
works with lists:
pages <- mapConcurrently getURL ["url1", "url2", "url3"]
newtype Concurrently a Source #
A value of type Concurrently a
is an IO
operation that can be
composed with other Concurrently
values, using the Applicative
and Alternative
instances.
Calling runConcurrently
on a value of type Concurrently a
will
execute the IO
operations it contains concurrently, before
delivering the result of type a
.
For example
(page1, page2, page3) <- runConcurrently $ (,,) <$> Concurrently (getURL "url1") <*> Concurrently (getURL "url2") <*> Concurrently (getURL "url3")
Concurrently | |
|
Instances
Alternative Concurrently Source # | |
Defined in Control.Concurrent.Async.Pool.Async empty :: Concurrently a # (<|>) :: Concurrently a -> Concurrently a -> Concurrently a # some :: Concurrently a -> Concurrently [a] # many :: Concurrently a -> Concurrently [a] # | |
Applicative Concurrently Source # | |
Defined in Control.Concurrent.Async.Pool.Async pure :: a -> Concurrently a # (<*>) :: Concurrently (a -> b) -> Concurrently a -> Concurrently b # liftA2 :: (a -> b -> c) -> Concurrently a -> Concurrently b -> Concurrently c # (*>) :: Concurrently a -> Concurrently b -> Concurrently b # (<*) :: Concurrently a -> Concurrently b -> Concurrently a # | |
Functor Concurrently Source # | |
Defined in Control.Concurrent.Async.Pool.Async fmap :: (a -> b) -> Concurrently a -> Concurrently b # (<$) :: a -> Concurrently b -> Concurrently a # |