Safe Haskell | None |
---|
- type Handle = Node
- type TaskInfo a = (Handle, Task a)
- type TaskGraph a = Gr (Task a) Status
- type Task a = IO a
- data Status
- data Pool a = Pool {}
- getReadyNodes :: Pool a -> TaskGraph a -> STM [Node]
- getTaskInfo :: TaskGraph a -> Handle -> TaskInfo a
- getReadyTasks :: Pool a -> STM [TaskInfo a]
- runPool :: Pool a -> IO ()
- startTask :: Pool a -> TaskInfo a -> IO (Async a)
- createPool :: Int -> IO (Pool a)
- setPoolSlots :: Pool a -> Int -> STM ()
- cancelAll :: Pool a -> IO ()
- cancelTask :: Pool a -> Handle -> IO ()
- nextIdent :: Pool a -> STM Int
- submitTask :: Pool a -> IO a -> STM Handle
- submitTask_ :: Pool a -> IO a -> STM Handle
- unsafeSequenceTasks :: Pool a -> Handle -> Handle -> STM ()
- sequenceTasks :: Pool a -> Handle -> Handle -> STM ()
- submitDependentTask :: Pool a -> [Handle] -> IO a -> STM Handle
- submitDependentTask_ :: Pool a -> Handle -> IO a -> STM Handle
- pollTaskEither :: Pool a -> Handle -> STM (Maybe (Either SomeException a))
- pollTask :: Pool a -> Handle -> STM (Maybe a)
- waitTaskEither :: Pool a -> Handle -> STM (Either SomeException a)
- waitTask :: Pool a -> Handle -> STM a
- withPool :: Int -> (Pool a -> IO b) -> IO b
- mapTasks' :: Traversable t => Int -> t (IO a) -> (IO (t b) -> IO (t c)) -> (Pool a -> Handle -> STM b) -> IO (t c)
- mapTasks :: Traversable t => Int -> t (IO a) -> IO (t a)
- mapTasksE :: Traversable t => Int -> t (IO a) -> IO (t (Either SomeException a))
- mapTasks_ :: Foldable t => Int -> t (IO a) -> IO ()
- mapTasksE_ :: Traversable t => Int -> t (IO a) -> IO (t (Maybe SomeException))
- mapTasksRace :: Traversable t => Int -> t (IO a) -> IO (Maybe (Either SomeException a))
- mapReduce :: (Foldable t, Monoid a) => Pool a -> t (IO a) -> STM Handle
- newtype Tasks a = Tasks {}
- runTasks :: Pool () -> Tasks a -> IO a
- task :: IO a -> Tasks a
- scatterFoldM :: (Foldable t, Monoid b) => Pool a -> t (IO a) -> (Either SomeException a -> IO b) -> IO b
Documentation
A Pool
manages a collection of possibly interdependent tasks, such that
tasks await execution until the tasks they depend on have finished (and
tasks may depend on an arbitrary number of other tasks), while
independent tasks execute concurrently up to the number of available
resource slots in the pool.
Results from each task are available until the status of the task is polled or waited on. Further, the results are kept until that occurs, so failing to ever wait will result in a memory leak.
Tasks may be cancelled, in which case all dependent tasks are unscheduled.
Pool | |
|
getReadyNodes :: Pool a -> TaskGraph a -> STM [Node]Source
Return a list of unlabeled nodes ready for execution. This decreases the number of available slots, but does not remove the nodes from the graph.
getTaskInfo :: TaskGraph a -> Handle -> TaskInfo aSource
Given a task handle, return everything we know about that task.
getReadyTasks :: Pool a -> STM [TaskInfo a]Source
Return information about the list of tasks ready to execute, sufficient to start them and remove them from the graph afterwards.
runPool :: Pool a -> IO ()Source
Begin executing tasks in the given pool. The number of slots determines
how many threads may execute concurrently. This number is adjustable
dynamically, by calling setPoolSlots
, though reducing it does not cause
already active threads to stop.
startTask :: Pool a -> TaskInfo a -> IO (Async a)Source
Start a task within the given pool. This begins execution as soon as the runtime is able to.
Create a thread pool for executing interdependent tasks concurrently. The number of available slots governs how many tasks may run at one time.
setPoolSlots :: Pool a -> Int -> STM ()Source
Set the number of available execution slots in the given Pool
.
Increasing the number will cause waiting threads to start executing
immediately, while decreasing the number only decreases any available
slots -- it does not cancel already executing threads.
cancelAll :: Pool a -> IO ()Source
Cancel every running thread in the pool and unschedule any that had not begun yet.
cancelTask :: Pool a -> Handle -> IO ()Source
Cancel a task submitted to the pool. This will unschedule it if it had not begun yet, or cancel its thread if it had.
nextIdent :: Pool a -> STM IntSource
Return the next available thread identifier from the pool. These are monotonically increasing integers.
submitTask :: Pool a -> IO a -> STM HandleSource
Submit an IO
action for execution within the managed thread pool. When
it actually begins executes is determined by the number of available
slots, whether the threaded runtime is being used, and how long it takes
the jobs before it to complete.
submitTask_ :: Pool a -> IO a -> STM HandleSource
Submit an 'IO ()' action, where we will never care about the result value
or if an exception occurred within the task. This means its process
table entry is automatically cleared immediately upon completion of the
task. Use this if you are doing your own result propagation, such as
writing to a TChan
within the task.
Given parent and child task handles, link them so that the child cannot execute until the parent has finished. This does not check for cycles, so can add new tasks in the time it takes to add a node and an edge into the graph.
submitDependentTask :: Pool a -> [Handle] -> IO a -> STM HandleSource
Submit a task, but only allow it begin executing once its parent task has
completed. This is equivalent to submitting a new task and linking it to
its parent using sequenceTasks
within a single STM transaction.
submitDependentTask_ :: Pool a -> Handle -> IO a -> STM HandleSource
Submit a dependent task where we do not care about the result value or if
an exception occurred. See submitTask_
.
pollTaskEither :: Pool a -> Handle -> STM (Maybe (Either SomeException a))Source
pollTask :: Pool a -> Handle -> STM (Maybe a)Source
Poll the given task, as with pollTaskEither
, but re-raise any
exceptions that were raised in the task's thread.
waitTaskEither :: Pool a -> Handle -> STM (Either SomeException a)Source
Wait until the given task has completed, then return its final status.
waitTask :: Pool a -> Handle -> STM aSource
Wait until the given task is completed, but re-raise any exceptions that were raised in the task's thread.
withPool :: Int -> (Pool a -> IO b) -> IO bSource
Execute an IO action, passing it a running pool with N available slots.
mapTasks' :: Traversable t => Int -> t (IO a) -> (IO (t b) -> IO (t c)) -> (Pool a -> Handle -> STM b) -> IO (t c)Source
Run a group of up to N tasks at a time concurrently, returning the results in order. The order of execution is random, but the results are returned in order.
mapTasks :: Traversable t => Int -> t (IO a) -> IO (t a)Source
Run a group of up to N tasks at a time concurrently, returning the results in order. The order of execution is random, but the results are returned in order.
mapTasksE :: Traversable t => Int -> t (IO a) -> IO (t (Either SomeException a))Source
Run a group of up to N tasks at a time concurrently, returning the results in order. The order of execution is random, but the results are returned in order.
mapTasks_ :: Foldable t => Int -> t (IO a) -> IO ()Source
Run a group of up to N tasks at a time concurrently, ignoring the results.
mapTasksE_ :: Traversable t => Int -> t (IO a) -> IO (t (Maybe SomeException))Source
Run a group of up to N tasks at a time concurrently, ignoring the results, but returning whether an exception occurred for each task.
mapTasksRace :: Traversable t => Int -> t (IO a) -> IO (Maybe (Either SomeException a))Source
Execute a group of tasks (where only N tasks at most may run,
corresponding to the number of available slots in the pool), returning
the first result or failure. Nothing
is returned if no tasks were
provided.
:: (Foldable t, Monoid a) | |
=> Pool a | Pool to execute the tasks within |
-> t (IO a) | Set of Monoid-yielding IO actions |
-> STM Handle | Returns a Handle to the final result task |
Given a list of actions yielding Monoid
results, execute the actions
concurrently (up to N at time, based on available slots), and also
mappend each pair of results concurrently as they become ready.
The immediate result from this function is Handle representing the final task -- dependent on all the rest -- whose value is the final, aggregate result.
This is equivalent to the following: mconcat $ mapTasks n actions
,
except that intermediate results can be garbage collected as soon as
they've merged. Also, the value returned from this function is a
Handle
which may be polled until that final result is ready.
Lastly, if any Exception occurs, the result obtained from waiting on or polling the Handle will be one of those exceptions, but not necessarily the first or the last.
scatterFoldM :: (Foldable t, Monoid b) => Pool a -> t (IO a) -> (Either SomeException a -> IO b) -> IO bSource
Execute a group of tasks concurrently (using up to N active threads, depending on the pool), and feed results to the continuation immediately as they become available, in whatever order they complete in. That function may return a monoid value which is accumulated to yield the final result.