taskpool-0.1.0: Manage pools of possibly interdependent tasks using STM and async

Safe HaskellNone

Data.TaskPool.Internal

Synopsis

Documentation

type Handle = NodeSource

A Handle is a unique reference to a task that has submitted to a Pool.

type TaskInfo a = (Handle, Task a)Source

type Task a = IO aSource

data Status Source

Constructors

Pending 
Completed 

Instances

data Pool a Source

A Pool manages a collection of possibly interdependent tasks, such that tasks await execution until the tasks they depend on have finished (and tasks may depend on an arbitrary number of other tasks), while independent tasks execute concurrently up to the number of available resource slots in the pool.

Results from each task are available until the status of the task is polled or waited on. Further, the results are kept until that occurs, so failing to ever wait will result in a memory leak.

Tasks may be cancelled, in which case all dependent tasks are unscheduled.

Constructors

Pool 

Fields

slots :: TVar Int

The total number of execution slots in the pool. If nothing is running, this is also the number of available slots. This can be changed dynamically using setPoolSlots.

avail :: TVar Int

The number of available execution slots in the pool.

procs :: TVar (IntMap (TMVar (Async a)))

The active or completed process table. For every task running in a thread, this holds the Async value governing that thread; for every completed task, it holds the Async value that records its completion value or exception status. These entries are inserted whenever a thread is started, and are cleared by ultimately calling pollTaskEither (which all the other polling and waiting functions also call).

Note that submitting a task with submitTask_ or submitDependentTask_ will remove the thread's Async value immediately at the end of the task, causing it to be garbage collected.

tasks :: TVar (TaskGraph a)

The task graph represents a partially ordered set P with subset S such that for every x ∈ S and y ∈ P, either x ≤ y or x is unrelated to y. Stated more simply, S is the set of least elements of all maximal chains in P. In our case, ≤ relates two uncompleted tasks by dependency. Therefore, S is equal to the set of tasks which may execute concurrently, as none of them have incomplete dependencies.

We use a graph representation to make determination of S more efficient (where S is just the set of roots in P expressed as a graph). Completion status is recorded on the edges, and nodes are removed from the graph once no other incomplete node depends on them.

tokens :: TVar Int

Tokens identify tasks, and are provisioned monotonically.

getReadyNodes :: Pool a -> TaskGraph a -> STM [Node]Source

Return a list of unlabeled nodes ready for execution. This decreases the number of available slots, but does not remove the nodes from the graph.

getTaskInfo :: TaskGraph a -> Handle -> TaskInfo aSource

Given a task handle, return everything we know about that task.

getReadyTasks :: Pool a -> STM [TaskInfo a]Source

Return information about the list of tasks ready to execute, sufficient to start them and remove them from the graph afterwards.

runPool :: Pool a -> IO ()Source

Begin executing tasks in the given pool. The number of slots determines how many threads may execute concurrently. This number is adjustable dynamically, by calling setPoolSlots, though reducing it does not cause already active threads to stop.

startTask :: Pool a -> TaskInfo a -> IO (Async a)Source

Start a task within the given pool. This begins execution as soon as the runtime is able to.

createPoolSource

Arguments

:: Int

Maximum number of running tasks.

-> IO (Pool a) 

Create a thread pool for executing interdependent tasks concurrently. The number of available slots governs how many tasks may run at one time.

setPoolSlots :: Pool a -> Int -> STM ()Source

Set the number of available execution slots in the given Pool. Increasing the number will cause waiting threads to start executing immediately, while decreasing the number only decreases any available slots -- it does not cancel already executing threads.

cancelAll :: Pool a -> IO ()Source

Cancel every running thread in the pool and unschedule any that had not begun yet.

cancelTask :: Pool a -> Handle -> IO ()Source

Cancel a task submitted to the pool. This will unschedule it if it had not begun yet, or cancel its thread if it had.

nextIdent :: Pool a -> STM IntSource

Return the next available thread identifier from the pool. These are monotonically increasing integers.

submitTask :: Pool a -> IO a -> STM HandleSource

Submit an IO action for execution within the managed thread pool. When it actually begins executes is determined by the number of available slots, whether the threaded runtime is being used, and how long it takes the jobs before it to complete.

submitTask_ :: Pool a -> IO a -> STM HandleSource

Submit an 'IO ()' action, where we will never care about the result value or if an exception occurred within the task. This means its process table entry is automatically cleared immediately upon completion of the task. Use this if you are doing your own result propagation, such as writing to a TChan within the task.

unsafeSequenceTasksSource

Arguments

:: Pool a 
-> Handle

Task doing the waiting

-> Handle

Task we must wait on (the parent)

-> STM () 

Given parent and child task handles, link them so that the child cannot execute until the parent has finished. This does not check for cycles, so can add new tasks in the time it takes to add a node and an edge into the graph.

sequenceTasksSource

Arguments

:: Pool a 
-> Handle

Task doing the waiting

-> Handle

Task we must wait on (the parent)

-> STM () 

submitDependentTask :: Pool a -> [Handle] -> IO a -> STM HandleSource

Submit a task, but only allow it begin executing once its parent task has completed. This is equivalent to submitting a new task and linking it to its parent using sequenceTasks within a single STM transaction.

submitDependentTask_ :: Pool a -> Handle -> IO a -> STM HandleSource

Submit a dependent task where we do not care about the result value or if an exception occurred. See submitTask_.

pollTaskEither :: Pool a -> Handle -> STM (Maybe (Either SomeException a))Source

Poll the given task, returning Nothing if it hasn't started yet or is currently executing, and a Just value if a final result is known.

pollTask :: Pool a -> Handle -> STM (Maybe a)Source

Poll the given task, as with pollTaskEither, but re-raise any exceptions that were raised in the task's thread.

waitTaskEither :: Pool a -> Handle -> STM (Either SomeException a)Source

Wait until the given task has completed, then return its final status.

waitTask :: Pool a -> Handle -> STM aSource

Wait until the given task is completed, but re-raise any exceptions that were raised in the task's thread.

withPool :: Int -> (Pool a -> IO b) -> IO bSource

Execute an IO action, passing it a running pool with N available slots.

mapTasks' :: Traversable t => Int -> t (IO a) -> (IO (t b) -> IO (t c)) -> (Pool a -> Handle -> STM b) -> IO (t c)Source

Run a group of up to N tasks at a time concurrently, returning the results in order. The order of execution is random, but the results are returned in order.

mapTasks :: Traversable t => Int -> t (IO a) -> IO (t a)Source

Run a group of up to N tasks at a time concurrently, returning the results in order. The order of execution is random, but the results are returned in order.

mapTasksE :: Traversable t => Int -> t (IO a) -> IO (t (Either SomeException a))Source

Run a group of up to N tasks at a time concurrently, returning the results in order. The order of execution is random, but the results are returned in order.

mapTasks_ :: Foldable t => Int -> t (IO a) -> IO ()Source

Run a group of up to N tasks at a time concurrently, ignoring the results.

mapTasksE_ :: Traversable t => Int -> t (IO a) -> IO (t (Maybe SomeException))Source

Run a group of up to N tasks at a time concurrently, ignoring the results, but returning whether an exception occurred for each task.

mapTasksRace :: Traversable t => Int -> t (IO a) -> IO (Maybe (Either SomeException a))Source

Execute a group of tasks (where only N tasks at most may run, corresponding to the number of available slots in the pool), returning the first result or failure. Nothing is returned if no tasks were provided.

mapReduceSource

Arguments

:: (Foldable t, Monoid a) 
=> Pool a

Pool to execute the tasks within

-> t (IO a)

Set of Monoid-yielding IO actions

-> STM Handle

Returns a Handle to the final result task

Given a list of actions yielding Monoid results, execute the actions concurrently (up to N at time, based on available slots), and also mappend each pair of results concurrently as they become ready.

The immediate result from this function is Handle representing the final task -- dependent on all the rest -- whose value is the final, aggregate result.

This is equivalent to the following: mconcat $ mapTasks n actions, except that intermediate results can be garbage collected as soon as they've merged. Also, the value returned from this function is a Handle which may be polled until that final result is ready.

Lastly, if any Exception occurs, the result obtained from waiting on or polling the Handle will be one of those exceptions, but not necessarily the first or the last.

newtype Tasks a Source

Constructors

Tasks 

Fields

runTasks' :: Pool () -> IO ([Handle], IO a)
 

runTasks :: Pool () -> Tasks a -> IO aSource

task :: IO a -> Tasks aSource

scatterFoldM :: (Foldable t, Monoid b) => Pool a -> t (IO a) -> (Either SomeException a -> IO b) -> IO bSource

Execute a group of tasks concurrently (using up to N active threads, depending on the pool), and feed results to the continuation immediately as they become available, in whatever order they complete in. That function may return a monoid value which is accumulated to yield the final result.