-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Cooperative task prioritization.
--
-- In a simple use case, we want to run some expensive tasks in
-- prioritized order, so that only one task is running on each CPU (or
-- hardware thread) at any time. For this simple case, four operations
-- are needed: simpleTaskPool, schedule, claim, and
-- startQueue.
--
--
-- let expensiveTask = threadDelay 1000000
-- pool <- simpleTaskPool
-- forkIO $ claim Acquire (schedule pool 1) $ putStrLn "Task 1 started . . ." >> expensiveTask >> putStrLn "Task 1 completed."
-- forkIO $ claim Acquire (schedule pool 3) $ putStrLn "Task 3 started . . ." >> expensiveTask >> putStrLn "Task 3 completed."
-- forkIO $ claim Acquire (schedule pool 2) $ putStrLn "Task 2 started . . ." >> expensiveTask >> putStrLn "Task 2 completed."
-- threadDelay 100000 -- contrive to wait for all tasks to become enqueued
-- putStrLn "Starting pool: "
-- startQueue pool
-- threadDelay 4000000 -- contrive to wait for all tasks to become dequeued
--
--
-- A TaskPool combines Rooms and Queues in an
-- efficient easy-to-use-interface.
--
-- Rooms provide fully reentrant synchronization to any number of
-- threads based on arbitrary resource constraints. For example, the
-- Room from a simpleTaskPool is constrained by
-- GHC.numCapabilities.
--
-- Queues provide task prioritization. A Queue
-- systematically examines (to a configurable depth) all waiting threads
-- with their priorities and resource constraints and wakes the most
-- eagerly prioritized thread whose constraints can be satisfied.
--
-- TaskPools are not thread pools. The concept is similar to IO
-- Completion Ports. There are no worker threads. If a number of threads
-- are waiting, the thread that is most likely to be processed next is
-- woken and temporarily serves as a working thread.
--
-- Rooms, Queues, and TaskPools are backed by
-- carefully written STM (software transactional memory) transactions.
--
-- A salient feature is that, because any thread can participate, a
-- TaskPool supports both bound threads and threads created with
-- forkOnIO.
--
-- The git repository is available at
-- http://www.downstairspeople.org/git/priority-sync.git.
@package priority-sync
@version 0.1.0.1
module Control.Concurrent.Priority.RoomConstraint
class RoomConstraint u
approveConstraint :: (RoomConstraint u) => Claim a -> u -> STM ()
-- | A maximum limit on the number of threads allowed to claim a room.
newtype MaxThreads
MaxThreads :: Int -> MaxThreads
-- | approve some claims according to their constraints.
approveClaims :: (RoomConstraint u) => [Claim u] -> STM ()
instance (RoomConstraint a) => RoomConstraint (Maybe a)
instance (RoomConstraint a, RoomConstraint b) => RoomConstraint (Either a b)
instance (RoomConstraint a, RoomConstraint b) => RoomConstraint (a, b)
instance (RoomConstraint u) => RoomConstraint (STM u)
instance RoomConstraint MaxThreads
instance RoomConstraint Bool
instance RoomConstraint ()
module Control.Concurrent.Priority.Queue
-- | A prioritized Queue. Prioritization is least-first, i.e. larger
-- values are nicer.
--
-- A Queue is not associated with any working thread, therefore,
-- it is the client's responsibility to make sure that every pushed task
-- is also pulled, or the Queue will stall. There are several ways
-- to accomplish this:
--
--
data (Ord a) => Queue a
data TaskHandle a
data QueueOrder
FIFO :: QueueOrder
FILO :: QueueOrder
-- | Configuration options for a Queue. A Queue blocks on a
-- number of predicates when dispatching a job. Generally,
-- fair_queue_configuration should work well for long-running
-- batch jobs and fast_queue_configuration should work for rapid
-- paced jobs.
--
--
-- - A single STM predicate for the entire Queue. This blocks
-- the entire Queue until the predicate is satisfied.
-- - A STM predicate parameterized by priority. This blocks a single
-- priority level, and the Queue will skip all tasks at that
-- priority.
-- - Each task is itself an STM transaction, and can block itself.
-- - Pure constraints on priority and ordering inversion.
--
--
-- If a task is blocked for any reason, the task is skipped and the next
-- task attempted, in priority order.
data (Ord a) => QueueConfigurationRecord a
QueueConfigurationRecord :: STM () -> (a -> STM ()) -> (a -> a -> Bool) -> Int -> !QueueOrder -> QueueConfigurationRecord a
-- | A predicate that must hold before any task may be pulled from a
-- Queue.
queue_predicate :: QueueConfigurationRecord a -> STM ()
-- | A predicate that must hold before any priority level may be pulled
-- from a Queue.
priority_indexed_predicate :: QueueConfigurationRecord a -> (a -> STM ())
-- | Constrains the greatest allowed difference between the priority of the
-- top-of-queue task and the priority of a task to be pulled.
allowed_priority_inversion :: QueueConfigurationRecord a -> a -> a -> Bool
-- | The greatest allowed difference between the ideal prioritized
-- FILO/FIFO ordering of tasks and the actual ordering of tasks. Setting
-- this too high can introduce a lot of overhead in the presence of a lot
-- of short-running tasks. Setting this to zero turns off the predicate
-- failover feature, i.e. only the top of queue task will ever be pulled.
allowed_ordering_inversion :: QueueConfigurationRecord a -> Int
-- | Should the Queue run in FILO or FIFO order. Ordering takes
-- place after prioritization, and won't have much effect if priorities
-- are very fine-grained.
queue_order :: QueueConfigurationRecord a -> !QueueOrder
-- | A queue tuned for high throughput and fairness when processing
-- moderate to long running tasks.
fair_queue_configuration :: (Ord a) => QueueConfigurationRecord a
-- | A queue tuned for high responsiveness and low priority inversion, but
-- may have poorer long-term throughput and potential to starve some
-- tasks compared to fair_queue_configuration.
fast_queue_configuration :: (Ord a) => QueueConfigurationRecord a
-- | Create a new Queue.
newQueue :: (Ord a) => QueueConfigurationRecord a -> IO (Queue a)
taskPriority :: TaskHandle a -> a
taskQueue :: TaskHandle a -> Queue a
pendingTasks :: (Ord a) => Queue a -> STM [TaskHandle a]
-- | True iff this task is poised at the top of it's Queue.
isTopOfQueue :: TaskHandle a -> STM Bool
hasCompleted :: TaskHandle a -> STM Bool
-- | Put a task with it's priority value onto this queue. Returns a handle
-- to the task.
putTask :: (Ord a) => Queue a -> a -> STM () -> STM (TaskHandle a)
-- | Pull and commit a task from this Queue.
pullTask :: (Ord a) => Queue a -> STM (TaskHandle a)
-- | Pull this task from the top of a Queue, if it is already there.
-- If this task is top-of-queue, but it's predicates fail, then
-- pullFromTop may instead pull a lower-priority
-- TaskHandle.
pullFromTop :: (Ord a) => TaskHandle a -> STM (TaskHandle a)
-- | Don't return until the given TaskHandles have been pulled from
-- their associated Queues. This doesn't guarantee that the
-- TaskHandle will ever be pulled, even when the TaskHandle
-- and Queue are both viable. You must concurrently arrange for
-- every other TaskHandle associated with the same Queue to
-- be pulled, or the Queue will stall. pullSpecificTasks
-- can handle lists TaskHandles that are distributed among several
-- Queues, as well as a TaskHandles that have already
-- completed or complete concurrently from another thread.
pullSpecificTasks :: (Ord a) => [TaskHandle a] -> IO ()
-- | "Fire and forget" some tasks on a separate thread.
dispatchTasks :: (Ord a) => [(Queue a, a, STM ())] -> IO [TaskHandle a]
-- | Process a Queue until it is empty.
flushQueue :: (Ord a) => Queue a -> IO ()
-- | The number of tasks pending on this Queue.
load :: (Ord a) => Queue a -> STM Int
instance (Ord a) => Ord (TaskHandle a)
instance (Ord a, Eq a) => Eq (TaskHandle a)
instance (Ord a) => Ord (Queue a)
instance (Ord a) => Eq (Queue a)
module Control.Concurrent.Priority.Room
-- | A resource pool, parameterized against arbitrary user data.
data Room u
-- | Create a new Room with some arbitrary user data.
newRoom :: u -> IO (Room u)
-- | Get all ThreadIds that are currently claimimg this Room.
inUse :: Room u -> STM (Set ThreadId)
-- | A Claim, or attempt to acquire or release a Room.
data Claim u
-- | Get the Room target of a Claim.
claimedRoom :: Claim u -> Room u
-- | Get the thread attempting a Claim.
claimedThread :: Claim u -> ThreadId
-- | Get the user data associated with a Room.
userData :: Room u -> u
class RoomGroup m
roomsOf :: (RoomGroup m) => m -> [Room (UserData m)]
class RoomConstraint u
approveConstraint :: (RoomConstraint u) => Claim a -> u -> STM ()
-- | Rules for calling claim_. The two major contexts are
-- DefaultRoomContext, which uses RoomConstraints to
-- determine which Rooms are available, and
-- UnconstrainedRoomContext, which does not place any constraints
-- on any Room.
class BaseRoomContext c where { type family BaseRoomContextData c :: *; }
approveClaimsEntering :: (BaseRoomContext c) => c -> [Claim (UserData c)] -> STM (BaseRoomContextData c)
approveClaimsExiting :: (BaseRoomContext c) => c -> [Claim (UserData c)] -> STM (BaseRoomContextData c)
waitingAction :: (BaseRoomContext c) => c -> (BaseRoomContextData c) -> STM ()
-- | An indirect reference to a BaseRoomContext.
class RoomContext c where { type family Base c :: *; }
baseContext :: (RoomContext c) => c -> Base c
-- | A maximum limit on the number of threads allowed to claim a room.
newtype MaxThreads
MaxThreads :: Int -> MaxThreads
data ClaimMode
Acquire :: ClaimMode
Release :: ClaimMode
-- | Require that all RoomConstraints be satisfied when acquiring a
-- Room. This is the default.
data DefaultRoomContext u
Default :: DefaultRoomContext u
-- | Don't check any RoomConstraints when acquiring a Room.
data UnconstrainedRoomContext u
Unconstrained :: UnconstrainedRoomContext u
-- | Temporarily Acquire, and then release, or Release, and
-- then acquire, some Rooms for the duration of a critical
-- section. A simple example where a room might be used to prevent
-- interleaving of stdout:
--
--
-- room <- newRoom (MaxThreads 1)
-- forkIO $ claim Acquire room $ putStrLn "Hello World!"
-- forkIO $ claim Acquire room $ putStrLn "Foo! Bar!"
--
claim :: (RoomGroup c, RoomContext c, BaseRoomContext (Base c), (UserData c) ~ (UserData (Base c))) => ClaimMode -> c -> IO a -> IO a
-- | approve some claims according to their constraints.
approveClaims :: (RoomConstraint u) => [Claim u] -> STM ()
instance (Base m ~ DefaultRoomContext (UserData m), BaseRoomContext c) => RoomContext (c, m)
instance (RoomConstraint u) => RoomContext [Room u]
instance (RoomConstraint u) => RoomContext (Room u)
instance (Base m ~ DefaultRoomContext (UserData m), BaseRoomContext c) => BaseRoomContext (c, m)
instance BaseRoomContext (UnconstrainedRoomContext u)
instance (RoomConstraint u) => BaseRoomContext (DefaultRoomContext u)
instance (UserData c ~ UserData m, RoomGroup c, RoomGroup m) => RoomGroup (c, m)
instance RoomGroup (UnconstrainedRoomContext u)
instance RoomGroup (DefaultRoomContext u)
instance RoomGroup [Room u]
instance RoomGroup (Room u)
module Control.Concurrent.Priority.Schedule
-- | Schedule a task to run from a prioritized Queue.
--
-- Tasks that do not actually make claims against any of the
-- Schedules internal Rooms will skip scheduling and the
-- Rooms will be claimed immediately using
-- DefaultRoomContext. This is usually what you want, in
-- particular in the case where no rooms are actually being claimed, e.g.
-- reentrant scheduling.
--
-- In other words:
--
-- Always wrong:
--
--
-- (Schedule q 2 Default,[room1,room2])
--
--
-- Right:
--
--
-- Schedule q 2 (Default,[room1,room2])
--
--
-- Alternately, if you only want to schedule access to room1,
-- you can place room1 internally and room2 externally.
-- Schedule will be smart about when to schedule and when not to
-- schedule:
--
--
-- (Schedule q 2 (Default,room1), room2)
--
--
-- The Default applies internally and externally to the
-- Schedule. In the following example, Unconstrained
-- applies to both room1 and room2:
--
--
-- (Schedule q 2 (Unconstrained,room1), room2)
--
data Schedule p c
Schedule :: (Queue p) -> p -> c -> Schedule p c
instance (BaseRoomContext (Schedule p c)) => RoomContext (Schedule p c)
instance (BaseRoomContextData c ~ (), Ord p, RoomGroup c, BaseRoomContext c) => BaseRoomContext (Schedule p c)
instance (RoomGroup c) => RoomGroup (Schedule p c)
-- | A prioritized TaskPool. This consists of a Queue, which
-- prioritizes tasks, and a Room which restricts the number of
-- tasks that may execute at one time.
module Control.Concurrent.Priority.TaskPool
data TaskPool p u
-- | A RoomContext for a task pool.
schedule :: TaskPool p u -> p -> (Schedule p (DefaultRoomContext (TaskPoolConstraint u), Room (TaskPoolConstraint u)))
-- | Create a new TaskPool. TaskPools begin stopped, use
-- startQueue to start.
--
--
--
-- Consider using simpleTaskPool if you have no special needs.
newTaskPool :: (Ord p) => QueueConfigurationRecord p -> Int -> u -> IO (TaskPool p u)
-- | Just create a new TaskPool. The task pool is constrained by the
-- number of capabilities indicated by numCapabilities.
simpleTaskPool :: (Ord p) => IO (TaskPool p ())
poolRoom :: TaskPool p u -> Room (TaskPoolConstraint u)
poolQueue :: TaskPool p u -> Queue p
startQueue :: TaskPool p u -> IO ()
stopQueue :: TaskPool p u -> IO ()
-- | The number of threads participating in this ThreadPool.
activity :: (Ord p) => TaskPool p u -> STM Int
instance RoomContext (TaskPool () u)
instance RoomGroup (TaskPool p u)