module PrioritySync.Internal.Queue
(Queue,
TaskHandle,
QueueOrder(..),
QueueConfigurationRecord(..),
fair_queue_configuration, fast_queue_configuration,
newQueue,
taskPriority,
taskQueue,
pendingTasks,
isTopOfQueue,
hasCompleted,
putTask,
pullTask,
pullFromTop,
pullSpecificTasks,
dispatchTasks,
flushQueue,
load,
PrioritySync.Internal.Queue.isEmpty)
where
import PrioritySync.Internal.Prioritized
import qualified Data.PSQueue as PSQ
import Data.PSQueue (Binding(..))
import Data.List as List (sort,sortBy,groupBy,drop,unfoldr)
import GHC.Conc
import Control.Monad
import Control.Concurrent.MVar
import Data.Unique
import Data.Ord
import Data.Maybe
import Control.Arrow (first)
data (Ord a) => Queue a = Queue {
queue_configuration :: !(QueueConfigurationRecord a),
queue_unique :: Unique,
pending_tasks :: TVar (PSQ.PSQ (TaskHandle a) (a,Integer)),
task_counter :: TVar Integer,
queue_is_empty :: TVar Bool }
data QueueOrder = FIFO | FILO
data (Ord a) => QueueConfigurationRecord a = QueueConfigurationRecord {
queue_predicate :: STM (),
priority_indexed_predicate :: (a -> STM ()),
allowed_priority_inversion :: a -> a -> Bool,
allowed_ordering_inversion :: Int,
queue_order :: !QueueOrder }
fair_queue_configuration :: (Ord a) => QueueConfigurationRecord a
fair_queue_configuration = QueueConfigurationRecord {
queue_predicate = return (),
priority_indexed_predicate = const $ return (),
allowed_priority_inversion = const $ const $ True,
allowed_ordering_inversion = numCapabilities*5,
queue_order = FIFO }
fast_queue_configuration :: (Ord a) => QueueConfigurationRecord a
fast_queue_configuration = fair_queue_configuration {
allowed_priority_inversion = (==),
allowed_ordering_inversion = numCapabilities,
queue_order = FILO }
instance (Ord a) => Eq (Queue a) where
(==) l r = queue_unique l == queue_unique r
instance (Ord a) => Ord (Queue a) where
compare l r = compare (queue_unique l) (queue_unique r)
data TaskHandle a = TaskHandle {
task_action :: STM (),
is_top_of_queue :: TVar Bool,
has_completed :: TVar Bool,
task_counter_index :: !Integer,
task_queue :: Queue a }
instance (Ord a,Eq a) => Eq (TaskHandle a) where
(==) l r = (==) (taskOrd l) (taskOrd r)
instance (Ord a) => Ord (TaskHandle a) where
compare l r = compare (taskOrd l) (taskOrd r)
taskOrd :: TaskHandle a -> (Integer,Queue a)
taskOrd t = (task_counter_index t,task_queue t)
isTopOfQueue :: TaskHandle a -> STM Bool
isTopOfQueue task = readTVar (is_top_of_queue task)
hasCompleted :: TaskHandle a -> STM Bool
hasCompleted task = readTVar (has_completed task)
taskPriority :: (Ord a) => TaskHandle a -> STM (Maybe a)
taskPriority task = liftM (fmap fst . PSQ.lookup task) $ readTVar $ pending_tasks $ taskQueue task
instance (Ord a) => Prioritized (TaskHandle a) where
type Priority (TaskHandle a) = a
reprioritize task f = watchingTopOfQueue (taskQueue task) $
do let pending_tvar = pending_tasks $ taskQueue task
writeTVar pending_tvar =<< liftM (PSQ.adjust (first f) task) (readTVar pending_tvar)
taskQueue :: TaskHandle a -> Queue a
taskQueue = task_queue
pendingTasks :: (Ord a) => Queue a -> STM [TaskHandle a]
pendingTasks = liftM PSQ.keys . readTVar . pending_tasks
newQueue :: (Ord a) => QueueConfigurationRecord a -> IO (Queue a)
newQueue config =
do pending_tasks_var <- newTVarIO PSQ.empty
counter <- newTVarIO 0
uniq <- newUnique
is_empty <- newTVarIO True
return Queue {
queue_configuration = config,
queue_unique = uniq,
pending_tasks = pending_tasks_var,
task_counter = counter,
queue_is_empty = is_empty }
putTask :: (Ord a) => Queue a -> a -> STM () -> STM (TaskHandle a)
putTask q prio actionSTM =
do count <- readTVar (task_counter q)
writeTVar (task_counter q) $ (case (queue_order $ queue_configuration q) of FIFO -> (+ 1); FILO -> (subtract 1)) count
false_top_of_queue <- newTVar False
false_has_completed <- newTVar False
let task = TaskHandle {
task_action = actionSTM,
is_top_of_queue = false_top_of_queue,
has_completed = false_has_completed,
task_counter_index = count,
task_queue = q }
watchingTopOfQueue q $ writeTVar (pending_tasks q) . PSQ.insert task (prio,task_counter_index task) =<< readTVar (pending_tasks q)
return task
load :: (Ord a) => Queue a -> STM Int
load = liftM PSQ.size . readTVar . pending_tasks
isEmpty :: (Ord a) => Queue a -> STM Bool
isEmpty = readTVar . queue_is_empty
pullTask :: (Ord a) => Queue a -> STM (TaskHandle a)
pullTask q = watchingTopOfQueue q $
do queue_predicate $ queue_configuration q
task_asc_list <- liftM (List.unfoldr PSQ.minView) (readTVar $ pending_tasks q)
task <- pullTask_ (queue_configuration q) (fst $ PSQ.prio $ head task_asc_list) 0 task_asc_list
writeTVar (pending_tasks q) =<< liftM (PSQ.delete task) (readTVar $ pending_tasks q)
writeTVar (has_completed task) True
return task
pullTask_ :: (Ord a) => QueueConfigurationRecord a ->
a ->
Int ->
[PSQ.Binding (TaskHandle a) (a,Integer)] ->
STM (TaskHandle a)
pullTask_ _ _ _ [] = retry
pullTask_ config top_prio faltered_tasks (task:untried_tasks) =
do when (faltered_tasks > allowed_ordering_inversion config) retry
unless (allowed_priority_inversion config top_prio $ fst $ PSQ.prio task) retry
prio_ok <- ((priority_indexed_predicate config $ fst $ PSQ.prio task) >> return True) `orElse` (return False)
case prio_ok of
False -> pullTask_ config top_prio (succ faltered_tasks) $ dropWhile ((== PSQ.prio task) . PSQ.prio) untried_tasks
True -> (task_action (PSQ.key task) >> return (PSQ.key task)) `orElse` pullTask_ config top_prio (succ faltered_tasks) untried_tasks
pullFromTop :: (Ord a) => TaskHandle a -> STM (TaskHandle a)
pullFromTop task =
do b <- hasCompleted task
if b then return task else
do flip unless retry =<< isTopOfQueue task
pullTask (taskQueue task)
pullSpecificTask :: (Ord a) => TaskHandle a -> IO ()
pullSpecificTask task =
do actual_task <- atomically $ pullFromTop task
unless (actual_task == task) $ pullSpecificTask task
pullSpecificTasks :: (Ord a) => [TaskHandle a] -> IO ()
pullSpecificTasks tasks =
do queue_groups <- mapM (\g -> liftM ((,) g) newEmptyMVar) $ map sort $ groupBy (\x y -> taskQueue x == taskQueue y) $ sortBy (comparing taskQueue) tasks
let pullTaskGroup (g,m) = mapM pullSpecificTask g >> putMVar m ()
mapM_ (forkIO . pullTaskGroup) (List.drop 1 queue_groups)
maybe (return ()) pullTaskGroup $ listToMaybe queue_groups
mapM_ (takeMVar . snd) queue_groups
dispatchTasks :: (Ord a) => [(Queue a,a,STM ())] -> IO [TaskHandle a]
dispatchTasks task_records =
do tasks <- mapM (\(q,a,actionSTM) -> atomically $ putTask q a actionSTM) task_records
_ <- forkIO $ pullSpecificTasks tasks
return tasks
flushQueue :: (Ord a) => Queue a -> IO ()
flushQueue q =
do want_zero <- atomically $
do l <- load q
when (l > 0) $ pullTask q >> return ()
return l
unless (want_zero == 0) $ flushQueue q
setTopOfQueue :: (Ord a) => Queue a -> Bool -> STM Bool
setTopOfQueue q t =
do m_min <- liftM PSQ.findMin $ readTVar (pending_tasks q)
case m_min of
Nothing -> return True
Just (task :-> _) ->
do previous_t <- readTVar (is_top_of_queue task)
writeTVar (is_top_of_queue task) t
return previous_t
watchingTopOfQueue :: (Ord a) => Queue a -> STM b -> STM b
watchingTopOfQueue q actionSTM =
do should_be_true <- setTopOfQueue q False
unless should_be_true $ error "watchingTopOfQueue: not reentrant"
result <- actionSTM
_ <- setTopOfQueue q True
pending <- readTVar (pending_tasks q)
is_empty <- readTVar (queue_is_empty q)
when (PSQ.null pending && not is_empty) $ writeTVar (queue_is_empty q) True
when ((not $ PSQ.null pending) && is_empty) $ writeTVar (queue_is_empty q) False
return result