-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A modified version of async that supports worker groups and many-to-many task dependencies -- -- This library modifies the async package to allow for task -- pooling and many-to-many dependencies between tasks. @package async-pool @version 0.9.1 -- | This module provides a set of operations for running IO operations -- asynchronously and waiting for their results. It is a thin layer over -- the basic concurrency operations provided by -- Control.Concurrent. The main additional functionality it -- provides is the ability to wait for the return value of a thread, plus -- functions for managing task pools, work groups, and many-to-many -- dependencies between tasks. The interface also provides some -- additional safety and robustness over using threads and MVar -- directly. -- -- The basic type is Async a, which represents an -- asynchronous IO action that will return a value of type -- a, or die with an exception. An Async corresponds to -- either a thread, or a Handle to an action waiting to be -- spawned. This makes it possible to submit very large numbers of tasks, -- with only N threads active at one time. -- -- For example, to fetch two web pages at the same time, we could do this -- (assuming a suitable getURL function): -- --
-- withTaskGroup 4 $ \g -> do -- a1 <- async g (getURL url1) -- a2 <- async g (getURL url2) -- page1 <- wait a1 -- page2 <- wait a2 -- ... ---- -- where async submits the operation to the worker group (and from -- which it is spawned in a separate thread), and wait waits for -- and returns the result. The number 4 indicates the maximum number of -- threads which may be spawned at one time. If the operation throws an -- exception, then that exception is re-thrown by wait. This is -- one of the ways in which this library provides some additional safety: -- it is harder to accidentally forget about exceptions thrown in child -- threads. -- -- A slight improvement over the previous example is this: -- --
-- withTaskGroup 4 $ \g -> do -- withAsync g (getURL url1) $ \a1 -> do -- withAsync g (getURL url2) $ \a2 -> do -- page1 <- wait a1 -- page2 <- wait a2 -- ... ---- -- withAsync is like async, except that the Async is -- automatically killed (or unscheduled, using cancel) if the -- enclosing IO operation returns before it has completed. Consider the -- case when the first wait throws an exception; then the second -- Async will be automatically killed rather than being left to -- run in the background, possibly indefinitely. This is the second way -- that the library provides additional safety: using withAsync -- means we can avoid accidentally leaving threads running. Furthermore, -- withAsync allows a tree of threads to be built, such that -- children are automatically killed if their parents die for any reason. -- -- The pattern of performing two IO actions concurrently and waiting for -- their results is packaged up in a combinator concurrently, so -- we can further shorten the above example to: -- --
-- withTaskGroup 4 $ \g -> do -- (page1, page2) <- concurrently g (getURL url1) (getURL url2) -- ... ---- -- The Functor instance can be used to change the result of an -- Async. For example: -- --
-- ghci> a <- async g (return 3) -- ghci> wait a -- 3 -- ghci> wait (fmap (+1) a) -- 4 --module Control.Concurrent.Async.Pool -- | An asynchronous action spawned by async or withAsync. -- Asynchronous actions are executed in a separate thread, and operations -- are provided for waiting for asynchronous actions to complete and -- obtaining their results (see e.g. wait). data Async a -- | Create both a pool, and a task group with a given number of execution -- slots. withTaskGroup :: Int -> (TaskGroup -> IO b) -> IO b -- | Create a task group within the given pool having a specified number of -- execution slots, but with a bounded lifetime. Leaving the block -- cancels every task still executing in the group. withTaskGroupIn :: Pool -> Int -> (TaskGroup -> IO b) -> IO b -- | A Pool manages a collection of possibly interdependent tasks, -- such that tasks await execution until the tasks they depend on have -- finished (and tasks may depend on an arbitrary number of other tasks), -- while independent tasks execute concurrently up to the number of -- available resource slots in the pool. -- -- Results from each task are available until the status of the task is -- polled or waited on. Further, the results are kept until that occurs, -- so failing to ever wait will result in a memory leak. -- -- Tasks may be cancelled, in which case all dependent tasks are -- unscheduled. data Pool -- | Create a task pool for managing many-to-many acyclic dependencies -- among tasks. createPool :: IO Pool data TaskGroup -- | Create a task group for executing interdependent tasks concurrently. -- The number of available slots governs how many tasks may run at one -- time. createTaskGroup :: Pool -> Int -> IO TaskGroup -- | Execute tasks in a given task group. The number of slots determines -- how many threads may execute concurrently. runTaskGroup :: TaskGroup -> IO () -- | Spawn an asynchronous action in a separate thread. async :: TaskGroup -> IO a -> IO (Async a) -- | Like async but using forkOS internally. asyncBound :: TaskGroup -> IO a -> IO (Async a) -- | Like async but using forkOn internally. asyncOn :: TaskGroup -> Int -> IO a -> IO (Async a) -- | Like async but using forkIOWithUnmask internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. asyncWithUnmask :: TaskGroup -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a) -- | Like asyncOn but using forkOnWithUnmask internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. asyncOnWithUnmask :: TaskGroup -> Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a) -- | Equivalent to async, but acts in STM so that -- makeDependent may be called after the task is created, but -- before it begins executing. asyncSTM :: TaskGroup -> IO a -> STM (Async a) taskHandle :: Async a -> Handle -- | Submit a task that begins execution only after its parent has -- completed. This is equivalent to submitting a new task with -- asyncSTM and linking it to its parent using -- makeDependent. asyncAfter :: TaskGroup -> Async b -> IO a -> IO (Async a) -- | Submit a task which begins execution after all its parents have -- completed. This is equivalent to submitting a new task with -- asyncSTM and linking it to its parents using 'mapM -- makeDependent'. asyncAfterAll :: TaskGroup -> [Handle] -> IO a -> IO (Async a) -- | Given parent and child tasks, link them so the child cannot execute -- until the parent has finished. makeDependent :: Pool -> Handle -> Handle -> STM () -- | Given parent and child tasks, link them so the child cannot execute -- until the parent has finished. This function does not check for -- introduction of cycles into the dependency graph, which would prevent -- the child from ever running. unsafeMakeDependent :: Pool -> Handle -> Handle -> STM () -- | Spawn an asynchronous action in a separate thread, and pass its -- Async handle to the supplied function. When the function -- returns or throws an exception, cancel is called on the -- Async. -- --
-- withAsync action inner = bracket (async action) cancel inner ---- -- This is a useful variant of async that ensures an -- Async is never left running unintentionally. -- -- Since cancel may block, withAsync may also block; see -- cancel for details. withAsync :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b -- | Like withAsync but uses forkOS internally. withAsyncBound :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b -- | Like withAsync but uses forkOn internally. withAsyncOn :: TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b -- | Like withAsync but uses forkIOWithUnmask internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. withAsyncWithUnmask :: TaskGroup -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b -- | Like withAsyncOn but uses forkOnWithUnmask internally. -- The child thread is passed a function that can be used to unmask -- asynchronous exceptions withAsyncOnWithUnmask :: TaskGroup -> Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b -- | Wait for an asynchronous action to complete, and return its value. If -- the asynchronous action threw an exception, then the exception is -- re-thrown by wait. -- --
-- wait = atomically . waitSTM --wait :: Async a -> IO a -- | Check whether an Async has completed yet. If it has not -- completed yet, then the result is Nothing, otherwise the -- result is Just e where e is Left x if the -- Async raised an exception x, or Right a if -- it returned a value a. -- --
-- poll = atomically . pollSTM --poll :: Async a -> IO (Maybe (Either SomeException a)) -- | Wait for an asynchronous action to complete, and return either -- Left e if the action raised an exception e, or -- Right a if it returned a value a. -- --
-- waitCatch = atomically . waitCatchSTM --waitCatch :: Async a -> IO (Either SomeException a) -- | Cancel an asynchronous action by throwing the ThreadKilled -- exception to it. Has no effect if the Async has already -- completed. -- --
-- cancel a = throwTo (asyncThreadId a) ThreadKilled ---- -- Note that cancel is synchronous in the same sense as -- throwTo. It does not return until the exception has been thrown -- in the target thread, or the target thread has completed. In -- particular, if the target thread is making a foreign call, the -- exception will not be thrown until the foreign call returns, and in -- this case cancel may block indefinitely. An asynchronous -- cancel can of course be obtained by wrapping cancel -- itself in async. cancel :: Async a -> IO () cancelWith :: Exception e => Async a -> e -> IO () -- | A version of wait that can be used inside an STM transaction. waitSTM :: Async a -> STM a -- | A version of poll that can be used inside an STM transaction. pollSTM :: Async a -> STM (Maybe (Either SomeException a)) -- | A version of waitCatch that can be used inside an STM -- transaction. waitCatchSTM :: Async a -> STM (Either SomeException a) -- | Wait for any of the supplied Asyncs to complete. If the first -- to complete throws an exception, then that exception is re-thrown by -- waitAny. -- -- If multiple Asyncs complete or have completed, then the value -- returned corresponds to the first completed Async in the list. waitAny :: [Async a] -> IO (Async a, a) -- | Wait for any of the supplied asynchronous operations to complete. The -- value returned is a pair of the Async that completed, and the -- result that would be returned by wait on that Async. -- -- If multiple Asyncs complete or have completed, then the value -- returned corresponds to the first completed Async in the list. waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a) -- | Like waitAny, but also cancels the other asynchronous -- operations as soon as one has completed. waitAnyCancel :: [Async a] -> IO (Async a, a) -- | Like waitAnyCatch, but also cancels the other asynchronous -- operations as soon as one has completed. waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a) -- | Wait for the first of two Asyncs to finish. If the -- Async that finished first raised an exception, then the -- exception is re-thrown by waitEither. waitEither :: Async a -> Async b -> IO (Either a b) -- | Wait for the first of two Asyncs to finish. waitEitherCatch :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b)) -- | Like waitEither, but also cancels both Asyncs -- before returning. waitEitherCancel :: Async a -> Async b -> IO (Either a b) -- | Like waitEitherCatch, but also cancels both -- Asyncs before returning. waitEitherCatchCancel :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b)) -- | Like waitEither, but the result is ignored. waitEither_ :: Async a -> Async b -> IO () -- | Waits for both Asyncs to finish, but if either of them throws -- an exception before they have both finished, then the exception is -- re-thrown by waitBoth. waitBoth :: Async a -> Async b -> IO (a, b) -- | Link the given Async to the current thread, such that if the -- Async raises an exception, that exception will be re-thrown -- in the current thread. link :: Async a -> IO () -- | Link two Asyncs together, such that if either raises an -- exception, the same exception is re-thrown in the other -- Async. link2 :: Async a -> Async b -> IO () -- | Execute a group of tasks within the given task group, returning the -- results in order. The order of execution is random, but the results -- are returned in order. mapTasks :: Traversable t => TaskGroup -> t (IO a) -> IO (t a) -- | Execute a group of tasks within the given task group, ignoring -- results. mapTasks_ :: Foldable t => TaskGroup -> t (IO a) -> IO () -- | Execute a group of tasks within the given task group, returning the -- results in order as an Either type to represent exceptions from -- actions. The order of execution is random, but the results are -- returned in order. mapTasksE :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Either SomeException a)) -- | Execute a group of tasks within the given task group, ignoring -- results, but returning a list of all exceptions. mapTasksE_ :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Maybe SomeException)) -- | Execute a group of tasks, but return the first result or failure and -- cancel the remaining tasks. mapRace :: Foldable t => TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a) -- | Given a list of actions yielding Monoid results, execute the -- actions concurrently (up to N at a time, based on available slots), -- and mappend each pair of results concurrently as they become -- ready. The immediate result of this function is an Async -- representing the final value. -- -- This is similar to the following: mconcat $ mapTasks n -- actions, except that intermediate results can be garbage -- collected as soon as they've been merged. Also, the value returned -- from this function is an Async which may be polled for the -- final result. -- -- Lastly, if an Exception occurs in any subtask, the final -- result will also yield an exception -- but not necessarily the first -- or last that was caught. mapReduce :: (Foldable t, Monoid a) => TaskGroup -> t (IO a) -> STM (Async a) -- | Execute a group of tasks concurrently (using up to N active threads, -- depending on the task group), and feed results to a continuation as -- soon as they become available, in random order. The continuation -- function may return a monoid value which is accumulated to yield a -- final result. If no such value is needed, simply provide `()`. scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m) => TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b -- | The Task Applicative and Monad allow for task dependencies to -- be built using Applicative and do notation. Monadic evaluation is -- sequenced, while applicative Evaluation is concurrent for each -- argument. In this way, mixing the two builds a dependency graph via -- ordinary Haskell code. data Task a -- | Run a value in the Task monad and block until the final result -- is computed. runTask :: TaskGroup -> Task a -> IO a -- | Lift any IO action into a Task. This is a synonym for -- liftIO. task :: IO a -> Task a -- | Run two IO actions concurrently, and return the first to -- finish. The loser of the race is cancelled. -- --
-- race left right = -- withAsync left $ \a -> -- withAsync right $ \b -> -- waitEither a b --race :: TaskGroup -> IO a -> IO b -> IO (Either a b) -- | Like race, but the result is ignored. race_ :: TaskGroup -> IO a -> IO b -> IO () -- | Run two IO actions concurrently, and return both results. If -- either action throws an exception at any time, then the other action -- is cancelled, and the exception is re-thrown by -- concurrently. -- --
-- concurrently left right = -- withAsync left $ \a -> -- withAsync right $ \b -> -- waitBoth a b --concurrently :: TaskGroup -> IO a -> IO b -> IO (a, b) -- | maps an IO-performing function over any Traversable -- data type, performing all the IO actions concurrently, and -- returning the original data structure with the arguments replaced by -- the results. -- -- For example, mapConcurrently works with lists: -- --
-- pages <- mapConcurrently getURL ["url1", "url2", "url3"] --mapConcurrently :: Traversable t => TaskGroup -> (a -> IO b) -> t a -> IO (t b) -- | A value of type Concurrently a is an IO operation -- that can be composed with other Concurrently values, using -- the Applicative and Alternative instances. -- -- Calling runConcurrently on a value of type Concurrently -- a will execute the IO operations it contains -- concurrently, before delivering the result of type a. -- -- For example -- --
-- (page1, page2, page3) -- <- runConcurrently $ (,,) -- <$> Concurrently (getURL "url1") -- <*> Concurrently (getURL "url2") -- <*> Concurrently (getURL "url3") --newtype Concurrently a Concurrently :: (TaskGroup -> IO a) -> Concurrently a [runConcurrently] :: Concurrently a -> TaskGroup -> IO a