{-# LANGUAGE TypeFamilies #-}

module PrioritySync.Internal.Queue
    (Queue,
     TaskHandle,
     QueueOrder(..),
     QueueConfigurationRecord(..),
     fair_queue_configuration, fast_queue_configuration,
     newQueue,
     taskPriority,
     taskQueue,
     pendingTasks,
     isTopOfQueue,
     hasCompleted,
     putTask,
     pullTask,
     pullFromTop,
     pullSpecificTasks,
     dispatchTasks,
     flushQueue,
     load,
     PrioritySync.Internal.Queue.isEmpty)
    where

import PrioritySync.Internal.Prioritized
import qualified Data.PSQueue as PSQ
import Data.PSQueue (Binding(..)) 
import Data.List as List (sort,sortBy,groupBy,drop,unfoldr)
import GHC.Conc
import Control.Monad
import Control.Concurrent.MVar
import Data.Unique
import Data.Ord
import Data.Maybe
import Control.Arrow (first)

-- | A prioritized 'Queue'.  Prioritization is least-first, i.e. larger values are nicer.
--
-- A 'Queue' is not associated with any working thread, therefore, it is the client\'s responsibility to make sure that every pushed
-- task is also pulled, or the 'Queue' will stall.  There are several ways to accomplish this:
--
-- * Call 'pullTask' at least once for every call to 'putTask'.
--
-- * Use 'dispatchTasks' to push every task.
--
-- * Use 'flushQueue' whenever the 'Queue' is not empty.
data (Ord a) => Queue a = Queue {
    queue_configuration :: !(QueueConfigurationRecord a),
    queue_unique :: Unique,
    pending_tasks :: TVar (PSQ.PSQ (TaskHandle a) (a,Integer)),
    task_counter :: TVar Integer,
    queue_is_empty :: TVar Bool }

data QueueOrder = FIFO | FILO

-- | Configuration options for a 'Queue'.  A 'Queue' blocks on a number of predicates when dispatching a job.  Generally, 'fair_queue_configuration'
-- should work well.
--
-- * A single STM predicate for the entire 'Queue'.  This blocks the entire 'Queue' until the predicate is satisfied.
--
-- * A STM predicate parameterized by priority.  This blocks a single priority level, and the 'Queue' will skip all tasks at that priority.
--
-- * Each task is itself an STM transaction, and can block itself.
--
-- * Pure constraints on priority and ordering inversion.
--
-- If a task is blocked for any reason, the task is skipped and the next task attempted, in priority order.
data (Ord a) => QueueConfigurationRecord a = QueueConfigurationRecord {
    -- | A predicate that must hold before any task may be pulled from a 'Queue'.
    queue_predicate :: STM (),
    -- | A predicate that must hold before any priority level may be pulled from a 'Queue'.
    priority_indexed_predicate :: (a -> STM ()),
    -- | Constrains the greatest allowed difference between the priority of the top-of-queue task and the priority of a task to be pulled.
    allowed_priority_inversion :: a -> a -> Bool,
    -- | The greatest allowed difference between the ideal prioritized FILO/FIFO ordering of tasks and the actual ordering of tasks.
    -- Setting this too high can introduce a lot of overhead in the presence of a lot of short-running tasks.
    -- Setting this to zero turns off the predicate failover feature, i.e. only the top of queue task will ever be pulled.
    allowed_ordering_inversion :: Int,
    -- | Should the 'Queue' run in FILO or FIFO order.  Ordering takes place after prioritization, and won't have much effect if priorities are very fine-grained.
    queue_order :: !QueueOrder }

-- | A queue tuned for high throughput and fairness when processing moderate to long running tasks.
fair_queue_configuration :: (Ord a) => QueueConfigurationRecord a
fair_queue_configuration = QueueConfigurationRecord {
    queue_predicate = return (),
    priority_indexed_predicate = const $ return (),
    allowed_priority_inversion = const $ const $ True,
    allowed_ordering_inversion = numCapabilities*5,
    queue_order = FIFO }

-- | A queue tuned for high responsiveness and low priority inversion, but may have poorer long-term throughput and potential to starve some tasks compared to 'fair_queue_configuration'.
fast_queue_configuration :: (Ord a) => QueueConfigurationRecord a
fast_queue_configuration = fair_queue_configuration {
    allowed_priority_inversion = (==),
    allowed_ordering_inversion = numCapabilities,
    queue_order = FILO }

instance (Ord a) => Eq (Queue a) where
    (==) l r = queue_unique l == queue_unique r

instance (Ord a) => Ord (Queue a) where
    compare l r = compare (queue_unique l) (queue_unique r)

data TaskHandle a = TaskHandle {
    task_action :: STM (),
    is_top_of_queue :: TVar Bool,
    has_completed :: TVar Bool,
    task_counter_index :: !Integer,
    task_queue :: Queue a }

instance (Ord a,Eq a) => Eq (TaskHandle a) where
    (==) l r = (==) (taskOrd l) (taskOrd r)

instance (Ord a) => Ord (TaskHandle a) where
    compare l r = compare (taskOrd l) (taskOrd r)

taskOrd :: TaskHandle a -> (Integer,Queue a)
taskOrd t = (task_counter_index t,task_queue t)

-- | True iff this task is poised at the top of it's 'Queue'.
isTopOfQueue :: TaskHandle a -> STM Bool
isTopOfQueue task = readTVar (is_top_of_queue task)

hasCompleted :: TaskHandle a -> STM Bool
hasCompleted task = readTVar (has_completed task)

-- | Get the priority of this task, which only exists if the task is still enqueued.
taskPriority :: (Ord a) => TaskHandle a -> STM (Maybe a)
taskPriority task = liftM (fmap fst . PSQ.lookup task) $ readTVar $ pending_tasks $ taskQueue task

instance (Ord a) => Prioritized (TaskHandle a) where
    type Priority (TaskHandle a) = a
    reprioritize task f = watchingTopOfQueue (taskQueue task) $
        do let pending_tvar = pending_tasks $ taskQueue task
           writeTVar pending_tvar =<< liftM (PSQ.adjust (first f) task) (readTVar pending_tvar)

-- | Get the 'Queue' associated with this task.
taskQueue :: TaskHandle a -> Queue a
taskQueue = task_queue

pendingTasks :: (Ord a) => Queue a -> STM [TaskHandle a]
pendingTasks = liftM PSQ.keys . readTVar . pending_tasks

-- | Create a new 'Queue'.  
newQueue :: (Ord a) => QueueConfigurationRecord a -> IO (Queue a)
newQueue config = 
    do pending_tasks_var <- newTVarIO PSQ.empty
       counter <- newTVarIO 0
       uniq <- newUnique
       is_empty <- newTVarIO True
       return Queue {
           queue_configuration = config,
           queue_unique = uniq,
           pending_tasks = pending_tasks_var,
           task_counter = counter,
           queue_is_empty = is_empty }

-- | Put a task with it's priority value onto this queue.  Returns a handle to the task.
putTask :: (Ord a) => Queue a -> a -> STM () -> STM (TaskHandle a)
putTask q prio actionSTM = 
    do count <- readTVar (task_counter q)
       writeTVar (task_counter q) $ (case (queue_order $ queue_configuration q) of FIFO -> (+ 1); FILO -> (subtract 1)) count
       false_top_of_queue <- newTVar False
       false_has_completed <- newTVar False
       let task = TaskHandle {
               task_action = actionSTM,
               is_top_of_queue = false_top_of_queue,
               has_completed = false_has_completed,
               task_counter_index = count,
               task_queue = q }
       watchingTopOfQueue q $ writeTVar (pending_tasks q) . PSQ.insert task (prio,task_counter_index task) =<< readTVar (pending_tasks q)
       return task

-- | The number of tasks pending on this Queue.
load :: (Ord a) => Queue a -> STM Int 
load = liftM PSQ.size . readTVar . pending_tasks

-- | True iff this Queue is empty.
isEmpty :: (Ord a) => Queue a -> STM Bool
isEmpty = readTVar . queue_is_empty

-- | Pull and commit a task from this 'Queue'.
pullTask :: (Ord a) => Queue a -> STM (TaskHandle a)
pullTask q = watchingTopOfQueue q $ 
    do queue_predicate $ queue_configuration q
       task_asc_list <- liftM (List.unfoldr PSQ.minView) (readTVar $ pending_tasks q)
       task <- pullTask_ (queue_configuration q) (fst $ PSQ.prio $ head task_asc_list) 0 task_asc_list
       writeTVar (pending_tasks q) =<< liftM (PSQ.delete task) (readTVar $ pending_tasks q)
       writeTVar (has_completed task) True
       return task

pullTask_ :: (Ord a) => QueueConfigurationRecord a ->
                        a ->
                        Int ->
                        [PSQ.Binding (TaskHandle a) (a,Integer)] ->
                        STM (TaskHandle a)
pullTask_ _ _ _ [] = retry
pullTask_ config top_prio faltered_tasks (task:untried_tasks) =
    do when (faltered_tasks > allowed_ordering_inversion config) retry
       unless (allowed_priority_inversion config top_prio $ fst $ PSQ.prio task) retry
       prio_ok <- ((priority_indexed_predicate config $ fst $ PSQ.prio task) >> return True) `orElse` (return False)
       case prio_ok of
           False -> pullTask_ config top_prio (succ faltered_tasks) $ dropWhile ((== PSQ.prio task) . PSQ.prio) untried_tasks
           True -> (task_action (PSQ.key task) >> return (PSQ.key task)) `orElse` pullTask_ config top_prio (succ faltered_tasks) untried_tasks

-- | Pull this task from the top of a 'Queue', if it is already there.
-- If this task is top-of-queue, but it's predicates fail, then 'pullFromTop' may instead pull a lower-priority 'TaskHandle'.
pullFromTop :: (Ord a) => TaskHandle a -> STM (TaskHandle a)
pullFromTop task = 
    do b <- hasCompleted task
       if b then return task else
           do flip unless retry =<< isTopOfQueue task
              pullTask (taskQueue task)

-- | Don't return until the given 'TaskHandle' has been pulled from its associated 'Queue'.
-- This doesn't guarantee that the 'TaskHandle' will ever be pulled, even when the 'TaskHandle' and 'Queue' are both viable.
-- You must concurrently arrange for every other 'TaskHandle' associated with the same 'Queue' to be pulled, or the 'Queue' will stall.
pullSpecificTask :: (Ord a) => TaskHandle a -> IO ()
pullSpecificTask task =
    do actual_task <- atomically $ pullFromTop task
       unless (actual_task == task) $ pullSpecificTask task

-- | Don't return until the given 'TaskHandle's have been pulled from their associated 'Queue's.
-- This doesn't guarantee that the 'TaskHandle' will ever be pulled, even when the 'TaskHandle' and 'Queue' are both viable.
-- You must concurrently arrange for every other 'TaskHandle' associated with the same 'Queue' to be pulled, or the 'Queue' will stall.
-- 'pullSpecificTasks' can handle lists 'TaskHandle's that are distributed among several 'Queue's, as well as a 'TaskHandle's that have
-- already completed or complete concurrently from another thread.
pullSpecificTasks :: (Ord a) => [TaskHandle a] -> IO ()
pullSpecificTasks tasks =
    do queue_groups <- mapM (\g -> liftM ((,) g) newEmptyMVar) $ map sort $ groupBy (\x y -> taskQueue x == taskQueue y) $ sortBy (comparing taskQueue) tasks
       let pullTaskGroup (g,m) = mapM pullSpecificTask g >> putMVar m ()
       mapM_ (forkIO . pullTaskGroup) (List.drop 1 queue_groups)
       maybe (return ()) pullTaskGroup $ listToMaybe queue_groups
       mapM_ (takeMVar . snd) queue_groups

-- | \"Fire and forget\" some tasks on a separate thread.
dispatchTasks :: (Ord a) => [(Queue a,a,STM ())] -> IO [TaskHandle a]
dispatchTasks task_records = 
    do tasks <- mapM (\(q,a,actionSTM) -> atomically $ putTask q a actionSTM) task_records
       _ <- forkIO $ pullSpecificTasks tasks
       return tasks

-- | Process a 'Queue' until it is empty.
flushQueue :: (Ord a) => Queue a -> IO ()
flushQueue q =
    do want_zero <- atomically $ 
           do l <- load q
              when (l > 0) $ pullTask q >> return ()
              return l
       unless (want_zero == 0) $ flushQueue q

setTopOfQueue :: (Ord a) => Queue a -> Bool -> STM Bool
setTopOfQueue q t =
    do m_min <- liftM PSQ.findMin $ readTVar (pending_tasks q)
       case m_min of
           Nothing -> return True
           Just (task :-> _) -> 
               do previous_t <- readTVar (is_top_of_queue task)
                  writeTVar (is_top_of_queue task) t
                  return previous_t

watchingTopOfQueue :: (Ord a) => Queue a -> STM b -> STM b
watchingTopOfQueue q actionSTM =
    do should_be_true <- setTopOfQueue q False
       unless should_be_true $ error "watchingTopOfQueue: not reentrant"
       result <- actionSTM
       _ <- setTopOfQueue q True
       pending <- readTVar (pending_tasks q)
       is_empty <- readTVar (queue_is_empty q)
       when (PSQ.null pending && not is_empty) $ writeTVar (queue_is_empty q) True
       when ((not $ PSQ.null pending) && is_empty) $ writeTVar (queue_is_empty q) False
       return result