-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Manage pools of possibly interdependent tasks using STM and async
--
-- A taskpool is a network of tasks (where connection indicates
-- dependency), where up to N independent tasks at time may execute
-- concurrently.
@package taskpool
@version 0.1.0
module Data.TaskPool.Internal
-- | A Handle is a unique reference to a task that has submitted to
-- a Pool.
type Handle = Node
type TaskInfo a = (Handle, Task a)
type TaskGraph a = Gr (Task a) Status
type Task a = IO a
data Status
Pending :: Status
Completed :: Status
-- | A Pool manages a collection of possibly interdependent tasks,
-- such that tasks await execution until the tasks they depend on have
-- finished (and tasks may depend on an arbitrary number of other tasks),
-- while independent tasks execute concurrently up to the number of
-- available resource slots in the pool.
--
-- Results from each task are available until the status of the task is
-- polled or waited on. Further, the results are kept until that occurs,
-- so failing to ever wait will result in a memory leak.
--
-- Tasks may be cancelled, in which case all dependent tasks are
-- unscheduled.
data Pool a
Pool :: TVar Int -> TVar Int -> TVar (IntMap (TMVar (Async a))) -> TVar (TaskGraph a) -> TVar Int -> Pool a
-- | The total number of execution slots in the pool. If nothing is
-- running, this is also the number of available slots. This can be
-- changed dynamically using setPoolSlots.
slots :: Pool a -> TVar Int
-- | The number of available execution slots in the pool.
avail :: Pool a -> TVar Int
-- | The active or completed process table. For every task running in a
-- thread, this holds the Async value governing that thread; for every
-- completed task, it holds the Async value that records its completion
-- value or exception status. These entries are inserted whenever a
-- thread is started, and are cleared by ultimately calling
-- pollTaskEither (which all the other polling and waiting
-- functions also call).
--
-- Note that submitting a task with submitTask_ or
-- submitDependentTask_ will remove the thread's Async value
-- immediately at the end of the task, causing it to be garbage
-- collected.
procs :: Pool a -> TVar (IntMap (TMVar (Async a)))
-- | The task graph represents a partially ordered set P with subset S such
-- that for every x ∈ S and y ∈ P, either x ≤ y or x is unrelated to y.
-- Stated more simply, S is the set of least elements of all maximal
-- chains in P. In our case, ≤ relates two uncompleted tasks by
-- dependency. Therefore, S is equal to the set of tasks which may
-- execute concurrently, as none of them have incomplete dependencies.
--
-- We use a graph representation to make determination of S more
-- efficient (where S is just the set of roots in P expressed as a
-- graph). Completion status is recorded on the edges, and nodes are
-- removed from the graph once no other incomplete node depends on them.
tasks :: Pool a -> TVar (TaskGraph a)
-- | Tokens identify tasks, and are provisioned monotonically.
tokens :: Pool a -> TVar Int
-- | Return a list of unlabeled nodes ready for execution. This decreases
-- the number of available slots, but does not remove the nodes from the
-- graph.
getReadyNodes :: Pool a -> TaskGraph a -> STM [Node]
-- | Given a task handle, return everything we know about that task.
getTaskInfo :: TaskGraph a -> Handle -> TaskInfo a
-- | Return information about the list of tasks ready to execute,
-- sufficient to start them and remove them from the graph afterwards.
getReadyTasks :: Pool a -> STM [TaskInfo a]
-- | Begin executing tasks in the given pool. The number of slots
-- determines how many threads may execute concurrently. This number is
-- adjustable dynamically, by calling setPoolSlots, though
-- reducing it does not cause already active threads to stop.
runPool :: Pool a -> IO ()
-- | Start a task within the given pool. This begins execution as soon as
-- the runtime is able to.
startTask :: Pool a -> TaskInfo a -> IO (Async a)
-- | Create a thread pool for executing interdependent tasks concurrently.
-- The number of available slots governs how many tasks may run at one
-- time.
createPool :: Int -> IO (Pool a)
-- | Set the number of available execution slots in the given Pool.
-- Increasing the number will cause waiting threads to start executing
-- immediately, while decreasing the number only decreases any available
-- slots -- it does not cancel already executing threads.
setPoolSlots :: Pool a -> Int -> STM ()
-- | Cancel every running thread in the pool and unschedule any that had
-- not begun yet.
cancelAll :: Pool a -> IO ()
-- | Cancel a task submitted to the pool. This will unschedule it if it had
-- not begun yet, or cancel its thread if it had.
cancelTask :: Pool a -> Handle -> IO ()
-- | Return the next available thread identifier from the pool. These are
-- monotonically increasing integers.
nextIdent :: Pool a -> STM Int
-- | Submit an IO action for execution within the managed thread
-- pool. When it actually begins executes is determined by the number of
-- available slots, whether the threaded runtime is being used, and how
-- long it takes the jobs before it to complete.
submitTask :: Pool a -> IO a -> STM Handle
-- | Submit an 'IO ()' action, where we will never care about the result
-- value or if an exception occurred within the task. This means its
-- process table entry is automatically cleared immediately upon
-- completion of the task. Use this if you are doing your own result
-- propagation, such as writing to a TChan within the task.
submitTask_ :: Pool a -> IO a -> STM Handle
-- | Given parent and child task handles, link them so that the child
-- cannot execute until the parent has finished. This does not check for
-- cycles, so can add new tasks in the time it takes to add a node and an
-- edge into the graph.
unsafeSequenceTasks :: Pool a -> Handle -> Handle -> STM ()
sequenceTasks :: Pool a -> Handle -> Handle -> STM ()
-- | Submit a task, but only allow it begin executing once its parent task
-- has completed. This is equivalent to submitting a new task and linking
-- it to its parent using sequenceTasks within a single STM
-- transaction.
submitDependentTask :: Pool a -> [Handle] -> IO a -> STM Handle
-- | Submit a dependent task where we do not care about the result value or
-- if an exception occurred. See submitTask_.
submitDependentTask_ :: Pool a -> Handle -> IO a -> STM Handle
-- | Poll the given task, returning Nothing if it hasn't started yet
-- or is currently executing, and a Just value if a final result
-- is known.
pollTaskEither :: Pool a -> Handle -> STM (Maybe (Either SomeException a))
-- | Poll the given task, as with pollTaskEither, but re-raise any
-- exceptions that were raised in the task's thread.
pollTask :: Pool a -> Handle -> STM (Maybe a)
-- | Wait until the given task has completed, then return its final status.
waitTaskEither :: Pool a -> Handle -> STM (Either SomeException a)
-- | Wait until the given task is completed, but re-raise any exceptions
-- that were raised in the task's thread.
waitTask :: Pool a -> Handle -> STM a
-- | Execute an IO action, passing it a running pool with N available
-- slots.
withPool :: Int -> (Pool a -> IO b) -> IO b
-- | Run a group of up to N tasks at a time concurrently, returning the
-- results in order. The order of execution is random, but the results
-- are returned in order.
mapTasks' :: Traversable t => Int -> t (IO a) -> (IO (t b) -> IO (t c)) -> (Pool a -> Handle -> STM b) -> IO (t c)
-- | Run a group of up to N tasks at a time concurrently, returning the
-- results in order. The order of execution is random, but the results
-- are returned in order.
mapTasks :: Traversable t => Int -> t (IO a) -> IO (t a)
-- | Run a group of up to N tasks at a time concurrently, returning the
-- results in order. The order of execution is random, but the results
-- are returned in order.
mapTasksE :: Traversable t => Int -> t (IO a) -> IO (t (Either SomeException a))
-- | Run a group of up to N tasks at a time concurrently, ignoring the
-- results.
mapTasks_ :: Foldable t => Int -> t (IO a) -> IO ()
-- | Run a group of up to N tasks at a time concurrently, ignoring the
-- results, but returning whether an exception occurred for each task.
mapTasksE_ :: Traversable t => Int -> t (IO a) -> IO (t (Maybe SomeException))
-- | Execute a group of tasks (where only N tasks at most may run,
-- corresponding to the number of available slots in the pool), returning
-- the first result or failure. Nothing is returned if no tasks
-- were provided.
mapTasksRace :: Traversable t => Int -> t (IO a) -> IO (Maybe (Either SomeException a))
-- | Given a list of actions yielding Monoid results, execute the
-- actions concurrently (up to N at time, based on available slots), and
-- also mappend each pair of results concurrently as they become ready.
--
-- The immediate result from this function is Handle representing the
-- final task -- dependent on all the rest -- whose value is the final,
-- aggregate result.
--
-- This is equivalent to the following: mconcat $ mapTasks n
-- actions, except that intermediate results can be garbage
-- collected as soon as they've merged. Also, the value returned from
-- this function is a Handle which may be polled until that final
-- result is ready.
--
-- Lastly, if any Exception occurs, the result obtained from waiting on
-- or polling the Handle will be one of those exceptions, but not
-- necessarily the first or the last.
mapReduce :: (Foldable t, Monoid a) => Pool a -> t (IO a) -> STM Handle
newtype Tasks a
Tasks :: (Pool () -> IO ([Handle], IO a)) -> Tasks a
runTasks' :: Tasks a -> Pool () -> IO ([Handle], IO a)
runTasks :: Pool () -> Tasks a -> IO a
task :: IO a -> Tasks a
-- | Execute a group of tasks concurrently (using up to N active threads,
-- depending on the pool), and feed results to the continuation
-- immediately as they become available, in whatever order they complete
-- in. That function may return a monoid value which is accumulated to
-- yield the final result.
scatterFoldM :: (Foldable t, Monoid b) => Pool a -> t (IO a) -> (Either SomeException a -> IO b) -> IO b
instance Eq Status
instance Show Status
instance MonadIO Tasks
instance Monad Tasks
instance Applicative Tasks
instance Functor Tasks
module Data.TaskPool
-- | A Pool manages a collection of possibly interdependent tasks,
-- such that tasks await execution until the tasks they depend on have
-- finished (and tasks may depend on an arbitrary number of other tasks),
-- while independent tasks execute concurrently up to the number of
-- available resource slots in the pool.
--
-- Results from each task are available until the status of the task is
-- polled or waited on. Further, the results are kept until that occurs,
-- so failing to ever wait will result in a memory leak.
--
-- Tasks may be cancelled, in which case all dependent tasks are
-- unscheduled.
data Pool a
-- | A Handle is a unique reference to a task that has submitted to
-- a Pool.
type Handle = Node
-- | Create a thread pool for executing interdependent tasks concurrently.
-- The number of available slots governs how many tasks may run at one
-- time.
createPool :: Int -> IO (Pool a)
-- | Begin executing tasks in the given pool. The number of slots
-- determines how many threads may execute concurrently. This number is
-- adjustable dynamically, by calling setPoolSlots, though
-- reducing it does not cause already active threads to stop.
runPool :: Pool a -> IO ()
-- | Set the number of available execution slots in the given Pool.
-- Increasing the number will cause waiting threads to start executing
-- immediately, while decreasing the number only decreases any available
-- slots -- it does not cancel already executing threads.
setPoolSlots :: Pool a -> Int -> STM ()
-- | Cancel every running thread in the pool and unschedule any that had
-- not begun yet.
cancelAll :: Pool a -> IO ()
-- | Submit an IO action for execution within the managed thread
-- pool. When it actually begins executes is determined by the number of
-- available slots, whether the threaded runtime is being used, and how
-- long it takes the jobs before it to complete.
submitTask :: Pool a -> IO a -> STM Handle
-- | Submit an 'IO ()' action, where we will never care about the result
-- value or if an exception occurred within the task. This means its
-- process table entry is automatically cleared immediately upon
-- completion of the task. Use this if you are doing your own result
-- propagation, such as writing to a TChan within the task.
submitTask_ :: Pool a -> IO a -> STM Handle
-- | Submit a task, but only allow it begin executing once its parent task
-- has completed. This is equivalent to submitting a new task and linking
-- it to its parent using sequenceTasks within a single STM
-- transaction.
submitDependentTask :: Pool a -> [Handle] -> IO a -> STM Handle
-- | Submit a dependent task where we do not care about the result value or
-- if an exception occurred. See submitTask_.
submitDependentTask_ :: Pool a -> Handle -> IO a -> STM Handle
-- | Cancel a task submitted to the pool. This will unschedule it if it had
-- not begun yet, or cancel its thread if it had.
cancelTask :: Pool a -> Handle -> IO ()
sequenceTasks :: Pool a -> Handle -> Handle -> STM ()
-- | Given parent and child task handles, link them so that the child
-- cannot execute until the parent has finished. This does not check for
-- cycles, so can add new tasks in the time it takes to add a node and an
-- edge into the graph.
unsafeSequenceTasks :: Pool a -> Handle -> Handle -> STM ()
-- | Wait until the given task is completed, but re-raise any exceptions
-- that were raised in the task's thread.
waitTask :: Pool a -> Handle -> STM a
-- | Wait until the given task has completed, then return its final status.
waitTaskEither :: Pool a -> Handle -> STM (Either SomeException a)
-- | Poll the given task, as with pollTaskEither, but re-raise any
-- exceptions that were raised in the task's thread.
pollTask :: Pool a -> Handle -> STM (Maybe a)
-- | Poll the given task, returning Nothing if it hasn't started yet
-- or is currently executing, and a Just value if a final result
-- is known.
pollTaskEither :: Pool a -> Handle -> STM (Maybe (Either SomeException a))
-- | Run a group of up to N tasks at a time concurrently, returning the
-- results in order. The order of execution is random, but the results
-- are returned in order.
mapTasks :: Traversable t => Int -> t (IO a) -> IO (t a)
-- | Run a group of up to N tasks at a time concurrently, returning the
-- results in order. The order of execution is random, but the results
-- are returned in order.
mapTasksE :: Traversable t => Int -> t (IO a) -> IO (t (Either SomeException a))
-- | Run a group of up to N tasks at a time concurrently, ignoring the
-- results.
mapTasks_ :: Foldable t => Int -> t (IO a) -> IO ()
-- | Run a group of up to N tasks at a time concurrently, ignoring the
-- results, but returning whether an exception occurred for each task.
mapTasksE_ :: Traversable t => Int -> t (IO a) -> IO (t (Maybe SomeException))
-- | Execute a group of tasks (where only N tasks at most may run,
-- corresponding to the number of available slots in the pool), returning
-- the first result or failure. Nothing is returned if no tasks
-- were provided.
mapTasksRace :: Traversable t => Int -> t (IO a) -> IO (Maybe (Either SomeException a))
-- | Given a list of actions yielding Monoid results, execute the
-- actions concurrently (up to N at time, based on available slots), and
-- also mappend each pair of results concurrently as they become ready.
--
-- The immediate result from this function is Handle representing the
-- final task -- dependent on all the rest -- whose value is the final,
-- aggregate result.
--
-- This is equivalent to the following: mconcat $ mapTasks n
-- actions, except that intermediate results can be garbage
-- collected as soon as they've merged. Also, the value returned from
-- this function is a Handle which may be polled until that final
-- result is ready.
--
-- Lastly, if any Exception occurs, the result obtained from waiting on
-- or polling the Handle will be one of those exceptions, but not
-- necessarily the first or the last.
mapReduce :: (Foldable t, Monoid a) => Pool a -> t (IO a) -> STM Handle
-- | Execute a group of tasks concurrently (using up to N active threads,
-- depending on the pool), and feed results to the continuation
-- immediately as they become available, in whatever order they complete
-- in. That function may return a monoid value which is accumulated to
-- yield the final result.
scatterFoldM :: (Foldable t, Monoid b) => Pool a -> t (IO a) -> (Either SomeException a -> IO b) -> IO b
data Tasks a
runTasks :: Pool () -> Tasks a -> IO a
task :: IO a -> Tasks a