-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Manage pools of possibly interdependent tasks using STM and async -- -- A taskpool is a network of tasks (where connection indicates -- dependency), where up to N independent tasks at time may execute -- concurrently. @package taskpool @version 0.1.0 module Data.TaskPool.Internal -- | A Handle is a unique reference to a task that has submitted to -- a Pool. type Handle = Node type TaskInfo a = (Handle, Task a) type TaskGraph a = Gr (Task a) Status type Task a = IO a data Status Pending :: Status Completed :: Status -- | A Pool manages a collection of possibly interdependent tasks, -- such that tasks await execution until the tasks they depend on have -- finished (and tasks may depend on an arbitrary number of other tasks), -- while independent tasks execute concurrently up to the number of -- available resource slots in the pool. -- -- Results from each task are available until the status of the task is -- polled or waited on. Further, the results are kept until that occurs, -- so failing to ever wait will result in a memory leak. -- -- Tasks may be cancelled, in which case all dependent tasks are -- unscheduled. data Pool a Pool :: TVar Int -> TVar Int -> TVar (IntMap (TMVar (Async a))) -> TVar (TaskGraph a) -> TVar Int -> Pool a -- | The total number of execution slots in the pool. If nothing is -- running, this is also the number of available slots. This can be -- changed dynamically using setPoolSlots. slots :: Pool a -> TVar Int -- | The number of available execution slots in the pool. avail :: Pool a -> TVar Int -- | The active or completed process table. For every task running in a -- thread, this holds the Async value governing that thread; for every -- completed task, it holds the Async value that records its completion -- value or exception status. These entries are inserted whenever a -- thread is started, and are cleared by ultimately calling -- pollTaskEither (which all the other polling and waiting -- functions also call). -- -- Note that submitting a task with submitTask_ or -- submitDependentTask_ will remove the thread's Async value -- immediately at the end of the task, causing it to be garbage -- collected. procs :: Pool a -> TVar (IntMap (TMVar (Async a))) -- | The task graph represents a partially ordered set P with subset S such -- that for every x ∈ S and y ∈ P, either x ≤ y or x is unrelated to y. -- Stated more simply, S is the set of least elements of all maximal -- chains in P. In our case, ≤ relates two uncompleted tasks by -- dependency. Therefore, S is equal to the set of tasks which may -- execute concurrently, as none of them have incomplete dependencies. -- -- We use a graph representation to make determination of S more -- efficient (where S is just the set of roots in P expressed as a -- graph). Completion status is recorded on the edges, and nodes are -- removed from the graph once no other incomplete node depends on them. tasks :: Pool a -> TVar (TaskGraph a) -- | Tokens identify tasks, and are provisioned monotonically. tokens :: Pool a -> TVar Int -- | Return a list of unlabeled nodes ready for execution. This decreases -- the number of available slots, but does not remove the nodes from the -- graph. getReadyNodes :: Pool a -> TaskGraph a -> STM [Node] -- | Given a task handle, return everything we know about that task. getTaskInfo :: TaskGraph a -> Handle -> TaskInfo a -- | Return information about the list of tasks ready to execute, -- sufficient to start them and remove them from the graph afterwards. getReadyTasks :: Pool a -> STM [TaskInfo a] -- | Begin executing tasks in the given pool. The number of slots -- determines how many threads may execute concurrently. This number is -- adjustable dynamically, by calling setPoolSlots, though -- reducing it does not cause already active threads to stop. runPool :: Pool a -> IO () -- | Start a task within the given pool. This begins execution as soon as -- the runtime is able to. startTask :: Pool a -> TaskInfo a -> IO (Async a) -- | Create a thread pool for executing interdependent tasks concurrently. -- The number of available slots governs how many tasks may run at one -- time. createPool :: Int -> IO (Pool a) -- | Set the number of available execution slots in the given Pool. -- Increasing the number will cause waiting threads to start executing -- immediately, while decreasing the number only decreases any available -- slots -- it does not cancel already executing threads. setPoolSlots :: Pool a -> Int -> STM () -- | Cancel every running thread in the pool and unschedule any that had -- not begun yet. cancelAll :: Pool a -> IO () -- | Cancel a task submitted to the pool. This will unschedule it if it had -- not begun yet, or cancel its thread if it had. cancelTask :: Pool a -> Handle -> IO () -- | Return the next available thread identifier from the pool. These are -- monotonically increasing integers. nextIdent :: Pool a -> STM Int -- | Submit an IO action for execution within the managed thread -- pool. When it actually begins executes is determined by the number of -- available slots, whether the threaded runtime is being used, and how -- long it takes the jobs before it to complete. submitTask :: Pool a -> IO a -> STM Handle -- | Submit an 'IO ()' action, where we will never care about the result -- value or if an exception occurred within the task. This means its -- process table entry is automatically cleared immediately upon -- completion of the task. Use this if you are doing your own result -- propagation, such as writing to a TChan within the task. submitTask_ :: Pool a -> IO a -> STM Handle -- | Given parent and child task handles, link them so that the child -- cannot execute until the parent has finished. This does not check for -- cycles, so can add new tasks in the time it takes to add a node and an -- edge into the graph. unsafeSequenceTasks :: Pool a -> Handle -> Handle -> STM () sequenceTasks :: Pool a -> Handle -> Handle -> STM () -- | Submit a task, but only allow it begin executing once its parent task -- has completed. This is equivalent to submitting a new task and linking -- it to its parent using sequenceTasks within a single STM -- transaction. submitDependentTask :: Pool a -> [Handle] -> IO a -> STM Handle -- | Submit a dependent task where we do not care about the result value or -- if an exception occurred. See submitTask_. submitDependentTask_ :: Pool a -> Handle -> IO a -> STM Handle -- | Poll the given task, returning Nothing if it hasn't started yet -- or is currently executing, and a Just value if a final result -- is known. pollTaskEither :: Pool a -> Handle -> STM (Maybe (Either SomeException a)) -- | Poll the given task, as with pollTaskEither, but re-raise any -- exceptions that were raised in the task's thread. pollTask :: Pool a -> Handle -> STM (Maybe a) -- | Wait until the given task has completed, then return its final status. waitTaskEither :: Pool a -> Handle -> STM (Either SomeException a) -- | Wait until the given task is completed, but re-raise any exceptions -- that were raised in the task's thread. waitTask :: Pool a -> Handle -> STM a -- | Execute an IO action, passing it a running pool with N available -- slots. withPool :: Int -> (Pool a -> IO b) -> IO b -- | Run a group of up to N tasks at a time concurrently, returning the -- results in order. The order of execution is random, but the results -- are returned in order. mapTasks' :: Traversable t => Int -> t (IO a) -> (IO (t b) -> IO (t c)) -> (Pool a -> Handle -> STM b) -> IO (t c) -- | Run a group of up to N tasks at a time concurrently, returning the -- results in order. The order of execution is random, but the results -- are returned in order. mapTasks :: Traversable t => Int -> t (IO a) -> IO (t a) -- | Run a group of up to N tasks at a time concurrently, returning the -- results in order. The order of execution is random, but the results -- are returned in order. mapTasksE :: Traversable t => Int -> t (IO a) -> IO (t (Either SomeException a)) -- | Run a group of up to N tasks at a time concurrently, ignoring the -- results. mapTasks_ :: Foldable t => Int -> t (IO a) -> IO () -- | Run a group of up to N tasks at a time concurrently, ignoring the -- results, but returning whether an exception occurred for each task. mapTasksE_ :: Traversable t => Int -> t (IO a) -> IO (t (Maybe SomeException)) -- | Execute a group of tasks (where only N tasks at most may run, -- corresponding to the number of available slots in the pool), returning -- the first result or failure. Nothing is returned if no tasks -- were provided. mapTasksRace :: Traversable t => Int -> t (IO a) -> IO (Maybe (Either SomeException a)) -- | Given a list of actions yielding Monoid results, execute the -- actions concurrently (up to N at time, based on available slots), and -- also mappend each pair of results concurrently as they become ready. -- -- The immediate result from this function is Handle representing the -- final task -- dependent on all the rest -- whose value is the final, -- aggregate result. -- -- This is equivalent to the following: mconcat $ mapTasks n -- actions, except that intermediate results can be garbage -- collected as soon as they've merged. Also, the value returned from -- this function is a Handle which may be polled until that final -- result is ready. -- -- Lastly, if any Exception occurs, the result obtained from waiting on -- or polling the Handle will be one of those exceptions, but not -- necessarily the first or the last. mapReduce :: (Foldable t, Monoid a) => Pool a -> t (IO a) -> STM Handle newtype Tasks a Tasks :: (Pool () -> IO ([Handle], IO a)) -> Tasks a runTasks' :: Tasks a -> Pool () -> IO ([Handle], IO a) runTasks :: Pool () -> Tasks a -> IO a task :: IO a -> Tasks a -- | Execute a group of tasks concurrently (using up to N active threads, -- depending on the pool), and feed results to the continuation -- immediately as they become available, in whatever order they complete -- in. That function may return a monoid value which is accumulated to -- yield the final result. scatterFoldM :: (Foldable t, Monoid b) => Pool a -> t (IO a) -> (Either SomeException a -> IO b) -> IO b instance Eq Status instance Show Status instance MonadIO Tasks instance Monad Tasks instance Applicative Tasks instance Functor Tasks module Data.TaskPool -- | A Pool manages a collection of possibly interdependent tasks, -- such that tasks await execution until the tasks they depend on have -- finished (and tasks may depend on an arbitrary number of other tasks), -- while independent tasks execute concurrently up to the number of -- available resource slots in the pool. -- -- Results from each task are available until the status of the task is -- polled or waited on. Further, the results are kept until that occurs, -- so failing to ever wait will result in a memory leak. -- -- Tasks may be cancelled, in which case all dependent tasks are -- unscheduled. data Pool a -- | A Handle is a unique reference to a task that has submitted to -- a Pool. type Handle = Node -- | Create a thread pool for executing interdependent tasks concurrently. -- The number of available slots governs how many tasks may run at one -- time. createPool :: Int -> IO (Pool a) -- | Begin executing tasks in the given pool. The number of slots -- determines how many threads may execute concurrently. This number is -- adjustable dynamically, by calling setPoolSlots, though -- reducing it does not cause already active threads to stop. runPool :: Pool a -> IO () -- | Set the number of available execution slots in the given Pool. -- Increasing the number will cause waiting threads to start executing -- immediately, while decreasing the number only decreases any available -- slots -- it does not cancel already executing threads. setPoolSlots :: Pool a -> Int -> STM () -- | Cancel every running thread in the pool and unschedule any that had -- not begun yet. cancelAll :: Pool a -> IO () -- | Submit an IO action for execution within the managed thread -- pool. When it actually begins executes is determined by the number of -- available slots, whether the threaded runtime is being used, and how -- long it takes the jobs before it to complete. submitTask :: Pool a -> IO a -> STM Handle -- | Submit an 'IO ()' action, where we will never care about the result -- value or if an exception occurred within the task. This means its -- process table entry is automatically cleared immediately upon -- completion of the task. Use this if you are doing your own result -- propagation, such as writing to a TChan within the task. submitTask_ :: Pool a -> IO a -> STM Handle -- | Submit a task, but only allow it begin executing once its parent task -- has completed. This is equivalent to submitting a new task and linking -- it to its parent using sequenceTasks within a single STM -- transaction. submitDependentTask :: Pool a -> [Handle] -> IO a -> STM Handle -- | Submit a dependent task where we do not care about the result value or -- if an exception occurred. See submitTask_. submitDependentTask_ :: Pool a -> Handle -> IO a -> STM Handle -- | Cancel a task submitted to the pool. This will unschedule it if it had -- not begun yet, or cancel its thread if it had. cancelTask :: Pool a -> Handle -> IO () sequenceTasks :: Pool a -> Handle -> Handle -> STM () -- | Given parent and child task handles, link them so that the child -- cannot execute until the parent has finished. This does not check for -- cycles, so can add new tasks in the time it takes to add a node and an -- edge into the graph. unsafeSequenceTasks :: Pool a -> Handle -> Handle -> STM () -- | Wait until the given task is completed, but re-raise any exceptions -- that were raised in the task's thread. waitTask :: Pool a -> Handle -> STM a -- | Wait until the given task has completed, then return its final status. waitTaskEither :: Pool a -> Handle -> STM (Either SomeException a) -- | Poll the given task, as with pollTaskEither, but re-raise any -- exceptions that were raised in the task's thread. pollTask :: Pool a -> Handle -> STM (Maybe a) -- | Poll the given task, returning Nothing if it hasn't started yet -- or is currently executing, and a Just value if a final result -- is known. pollTaskEither :: Pool a -> Handle -> STM (Maybe (Either SomeException a)) -- | Run a group of up to N tasks at a time concurrently, returning the -- results in order. The order of execution is random, but the results -- are returned in order. mapTasks :: Traversable t => Int -> t (IO a) -> IO (t a) -- | Run a group of up to N tasks at a time concurrently, returning the -- results in order. The order of execution is random, but the results -- are returned in order. mapTasksE :: Traversable t => Int -> t (IO a) -> IO (t (Either SomeException a)) -- | Run a group of up to N tasks at a time concurrently, ignoring the -- results. mapTasks_ :: Foldable t => Int -> t (IO a) -> IO () -- | Run a group of up to N tasks at a time concurrently, ignoring the -- results, but returning whether an exception occurred for each task. mapTasksE_ :: Traversable t => Int -> t (IO a) -> IO (t (Maybe SomeException)) -- | Execute a group of tasks (where only N tasks at most may run, -- corresponding to the number of available slots in the pool), returning -- the first result or failure. Nothing is returned if no tasks -- were provided. mapTasksRace :: Traversable t => Int -> t (IO a) -> IO (Maybe (Either SomeException a)) -- | Given a list of actions yielding Monoid results, execute the -- actions concurrently (up to N at time, based on available slots), and -- also mappend each pair of results concurrently as they become ready. -- -- The immediate result from this function is Handle representing the -- final task -- dependent on all the rest -- whose value is the final, -- aggregate result. -- -- This is equivalent to the following: mconcat $ mapTasks n -- actions, except that intermediate results can be garbage -- collected as soon as they've merged. Also, the value returned from -- this function is a Handle which may be polled until that final -- result is ready. -- -- Lastly, if any Exception occurs, the result obtained from waiting on -- or polling the Handle will be one of those exceptions, but not -- necessarily the first or the last. mapReduce :: (Foldable t, Monoid a) => Pool a -> t (IO a) -> STM Handle -- | Execute a group of tasks concurrently (using up to N active threads, -- depending on the pool), and feed results to the continuation -- immediately as they become available, in whatever order they complete -- in. That function may return a monoid value which is accumulated to -- yield the final result. scatterFoldM :: (Foldable t, Monoid b) => Pool a -> t (IO a) -> (Either SomeException a -> IO b) -> IO b data Tasks a runTasks :: Pool () -> Tasks a -> IO a task :: IO a -> Tasks a