module Control.Concurrent.Priority.Queue
(Queue,
TaskHandle,
QueueOrder(..),
QueueConfigurationRecord(..),
fair_queue_configuration, fast_queue_configuration,
newQueue,
taskPriority,
taskQueue,
pendingTasks,
isTopOfQueue,
hasCompleted,
putTask,
pullTask,
pullFromTop,
pullSpecificTasks,
dispatchTasks,
flushQueue,
load)
where
import Data.Heap as Heap
import Data.List as List (sort,sortBy,groupBy,drop)
import GHC.Conc
import Control.Monad
import Data.Unique
import Data.Ord
import Data.Maybe
data (Ord a) => Queue a = Queue {
queue_configuration :: !(QueueConfigurationRecord a),
queue_unique :: Unique,
pending_tasks :: TVar (MinHeap (TaskHandle a)),
task_counter :: TVar Integer }
data QueueOrder = FIFO | FILO
data (Ord a) => QueueConfigurationRecord a = QueueConfigurationRecord {
queue_predicate :: STM (),
priority_indexed_predicate :: (a -> STM ()),
allowed_priority_inversion :: a -> a -> Bool,
allowed_ordering_inversion :: Int,
queue_order :: !QueueOrder }
fair_queue_configuration :: (Ord a) => QueueConfigurationRecord a
fair_queue_configuration = QueueConfigurationRecord {
queue_predicate = return (),
priority_indexed_predicate = const $ return (),
allowed_priority_inversion = const $ const $ True,
allowed_ordering_inversion = numCapabilities*5,
queue_order = FIFO }
fast_queue_configuration :: (Ord a) => QueueConfigurationRecord a
fast_queue_configuration = fair_queue_configuration {
allowed_priority_inversion = (==),
allowed_ordering_inversion = numCapabilities,
queue_order = FILO }
instance (Ord a) => Eq (Queue a) where
(==) l r = queue_unique l == queue_unique r
instance (Ord a) => Ord (Queue a) where
compare l r = compare (queue_unique l) (queue_unique r)
data TaskHandle a = TaskHandle {
task_action :: STM (),
is_top_of_queue :: TVar Bool,
has_completed :: TVar Bool,
task_counter_index :: !Integer,
task_priority :: !a,
task_queue :: Queue a }
instance (Ord a,Eq a) => Eq (TaskHandle a) where
(==) l r = (==) (taskOrd l) (taskOrd r)
instance (Ord a) => Ord (TaskHandle a) where
compare l r = compare (taskOrd l) (taskOrd r)
taskOrd :: TaskHandle a -> (a,Integer,Queue a)
taskOrd t = (task_priority t,task_counter_index t,task_queue t)
isTopOfQueue :: TaskHandle a -> STM Bool
isTopOfQueue task = readTVar (is_top_of_queue task)
hasCompleted :: TaskHandle a -> STM Bool
hasCompleted task = readTVar (has_completed task)
taskPriority :: TaskHandle a -> a
taskPriority = task_priority
taskQueue :: TaskHandle a -> Queue a
taskQueue = task_queue
pendingTasks :: (Ord a) => Queue a -> STM [TaskHandle a]
pendingTasks = liftM Heap.toList . readTVar . pending_tasks
newQueue :: (Ord a) => QueueConfigurationRecord a -> IO (Queue a)
newQueue config =
do pending_tasks_var <- newTVarIO empty
counter <- newTVarIO 0
uniq <- newUnique
return Queue {
queue_configuration = config,
queue_unique = uniq,
pending_tasks = pending_tasks_var,
task_counter = counter }
putTask :: (Ord a) => Queue a -> a -> STM () -> STM (TaskHandle a)
putTask q prio actionSTM =
do count <- readTVar (task_counter q)
writeTVar (task_counter q) $ (case (queue_order $ queue_configuration q) of FIFO -> (+ 1); FILO -> (subtract 1)) count
false_top_of_queue <- newTVar False
false_has_completed <- newTVar False
let task = TaskHandle {
task_action = actionSTM,
is_top_of_queue = false_top_of_queue,
has_completed = false_has_completed,
task_counter_index = count,
task_priority = prio,
task_queue = q }
watchingTopOfQueue q $ writeTVar (pending_tasks q) . insert task =<< readTVar (pending_tasks q)
return task
load :: (Ord a) => Queue a -> STM Int
load q = liftM size $ readTVar (pending_tasks q)
pullTask :: (Ord a) => Queue a -> STM (TaskHandle a)
pullTask q = watchingTopOfQueue q $
do queue_predicate $ queue_configuration q
(task,rest) <- pullTask_ (queue_configuration q) empty =<< readTVar (pending_tasks q)
writeTVar (pending_tasks q) rest
writeTVar (has_completed task) True
return task
pullTask_ :: (Ord a) => QueueConfigurationRecord a -> MinHeap (TaskHandle a) -> MinHeap (TaskHandle a) -> STM (TaskHandle a,MinHeap (TaskHandle a))
pullTask_ config faltered_tasks untried_tasks =
do when (Heap.size faltered_tasks > allowed_ordering_inversion config) retry
(task,rest) <- maybe retry return $ view untried_tasks
let top_prio = taskPriority $ maybe task fst $ view $ faltered_tasks
unless (allowed_priority_inversion config top_prio (taskPriority task)) retry
let predicateFailed = do let (same_prios,remaining_prios) = Heap.span ((== (task_priority task)) . task_priority) rest
pullTask_ config (insert task faltered_tasks `union` fromList same_prios) remaining_prios
let taskFailed = do pullTask_ config (insert task faltered_tasks) rest
prio_ok <- ((priority_indexed_predicate config $ task_priority task) >> return True) `orElse` (return False)
case prio_ok of
False -> predicateFailed
True -> (task_action task >> return (task,faltered_tasks `union` rest)) `orElse` taskFailed
pullFromTop :: (Ord a) => TaskHandle a -> STM (TaskHandle a)
pullFromTop task =
do b <- hasCompleted task
if b then return task else
do flip unless retry =<< isTopOfQueue task
pullTask (taskQueue task)
pullSpecificTask :: (Ord a) => TaskHandle a -> IO ()
pullSpecificTask task =
do actual_task <- atomically $ pullFromTop task
unless (actual_task == task) $ pullSpecificTask task
pullSpecificTasks :: (Ord a) => [TaskHandle a] -> IO ()
pullSpecificTasks tasks =
do queue_groups <- mapM (\g -> liftM ((,) g) newEmptyMVar) $ map sort $ groupBy (\x y -> taskQueue x == taskQueue y) $ sortBy (comparing taskQueue) tasks
let pullTaskGroup (g,m) = mapM pullSpecificTask g >> putMVar m ()
mapM (forkIO . pullTaskGroup) (List.drop 1 queue_groups)
maybe (return ()) pullTaskGroup $ listToMaybe queue_groups
mapM_ (takeMVar . snd) queue_groups
dispatchTasks :: (Ord a) => [(Queue a,a,STM ())] -> IO [TaskHandle a]
dispatchTasks task_records =
do tasks <- mapM (\(q,a,actionSTM) -> atomically $ putTask q a actionSTM) task_records
forkIO $ pullSpecificTasks tasks
return tasks
flushQueue :: (Ord a) => Queue a -> IO ()
flushQueue q =
do want_zero <- atomically $
do l <- load q
when (l > 0) $ pullTask q >> return ()
return l
unless (want_zero == 0) $ flushQueue q
setTopOfQueue :: (Ord a) => Queue a -> Bool -> STM Bool
setTopOfQueue q t =
do m_view <- liftM view $ readTVar (pending_tasks q)
case m_view of
Nothing -> return True
Just (task,_) ->
do previous_t <- readTVar (is_top_of_queue task)
writeTVar (is_top_of_queue task) t
return previous_t
watchingTopOfQueue :: (Ord a) => Queue a -> STM b -> STM b
watchingTopOfQueue q actionSTM =
do should_be_true <- setTopOfQueue q False
unless should_be_true $ error "watchingTopOfQueue: not reentrant"
result <- actionSTM
setTopOfQueue q True
return result