{-# LANGUAGE TypeFamilies, FlexibleInstances #-}

-- | A prioritized TaskPool.  This consists of a 'Queue', which prioritizes tasks, and a 'Room' which restricts the number of tasks that may execute at one time.
module PrioritySync.Internal.TaskPool
    (TaskPool,
     PrioritySync.Internal.TaskPool.schedule,
     newTaskPool,
     simpleTaskPool,
     poolRoom,
     poolQueue,
     startQueue,
     stopQueue,
     PrioritySync.Internal.TaskPool.isEmpty,
     waitUntilFinished)
    where

import PrioritySync.Internal.Room as Room
import PrioritySync.Internal.Queue as Queue
import PrioritySync.Internal.Schedule
import PrioritySync.Internal.UserData
import PrioritySync.Internal.RoomGroup
import PrioritySync.Internal.RoomConstraint
import PrioritySync.Internal.Constrained
import Control.Monad
import Data.Set as Set
import GHC.Conc

data TaskPool p u = TaskPool {
    pool_on :: TVar Bool,
    pool_queue :: Queue p,
    pool_room :: Room (TaskPoolConstraint u) }

type TaskPoolConstraint u = (Maybe MaxThreads, u)

type instance UserData (TaskPool p u) = TaskPoolConstraint u

instance RoomGroup (TaskPool p u) where
    roomsOf (TaskPool _ _ m) = [m]

-- | A prioritized 'ClaimContext' for a task pool.
schedule :: TaskPool p u -> p -> (Schedule p (Constrained (TaskPoolConstraint u),Room (TaskPoolConstraint u)))
schedule (TaskPool _ q m) p = Schedule q p (Constrained,m)

-- | Create a new 'TaskPool'.  'TaskPool's begin stopped, use 'startQueue' to start.
--
-- * A 'QueueConfigurationRecord' for the backing 'Queue'.  A typical value is 'fair_queue_configuration'.
--
-- * The allowed number of threads that can access the 'TaskPool' simultaneously.
--
-- * The user data for the backing 'Room'.  This can be @()@.
--
-- Consider using 'simpleTaskPool' if you have no special needs.
--
newTaskPool :: (Ord p) => QueueConfigurationRecord p -> Int -> u -> IO (TaskPool p u)
newTaskPool config n u =
    do on <- newTVarIO False 
       m <- newRoom $ (Just $ MaxThreads n,u)
       q <- newQueue $ config { queue_predicate = (flip when retry . not =<< readTVar on) >> 
                                                  (flip when retry . (>= n) . Set.size =<< inUse m) >> 
                                                  queue_predicate config }
       return $ TaskPool on q m

-- | Just create a new 'TaskPool'.  The task pool is constrained by the number of capabilities indicated by 'GHC.Conc.numCapabilities'.
simpleTaskPool :: (Ord p) => IO (TaskPool p ())
simpleTaskPool = newTaskPool fair_queue_configuration numCapabilities ()

-- | Get the 'Room' that primarily constrains this 'TaskPool'.
poolRoom :: TaskPool p u -> Room (TaskPoolConstraint u)
poolRoom = pool_room

-- | Get the 'Queue' that admits new tasks to this 'TaskPool'.
poolQueue :: TaskPool p u -> Queue p
poolQueue = pool_queue

-- | Start the 'TaskPool'.
startQueue :: TaskPool p u -> STM ()
startQueue tp = writeTVar (pool_on tp) True

-- | Stop all activity on this 'TaskPool'.
stopQueue :: TaskPool p u -> STM ()
stopQueue tp = writeTVar (pool_on tp) False

-- | True iff this 'TaskPool' is entirely empty and inactive.
isEmpty :: (Ord p) => TaskPool p u -> STM Bool
isEmpty tp = liftM2 (&&) (Queue.isEmpty $ poolQueue tp) (Room.isEmpty $ poolRoom tp)

-- | Wait until a queue is finished.
waitUntilFinished :: (Ord p) => TaskPool p u -> IO ()
waitUntilFinished tp = atomically $ flip unless retry =<< PrioritySync.Internal.TaskPool.isEmpty tp