{-# LANGUAGE TypeFamilies, FlexibleInstances #-}

-- | A prioritized TaskPool.  This consists of a 'Queue', which prioritizes tasks, and a 'Room' which restricts the number of tasks that may execute at one time.
module Control.Concurrent.Priority.TaskPool
    (TaskPool,
     Control.Concurrent.Priority.TaskPool.schedule,
     newTaskPool,
     simpleTaskPool,
     poolRoom,
     poolQueue,
     startQueue,
     stopQueue,
     activity)
    where

import Control.Concurrent.Priority.Room
import Control.Concurrent.Priority.Queue
import Control.Concurrent.Priority.Schedule
import Control.Monad
import Data.Set as Set
import GHC.Conc

data TaskPool p u = TaskPool {
    pool_on :: TVar Bool,
    pool_queue :: Queue p,
    pool_room :: Room (TaskPoolConstraint u) }

type TaskPoolConstraint u = (Maybe MaxThreads, u)

type instance UserData (TaskPool p u) = TaskPoolConstraint u

instance RoomGroup (TaskPool p u) where
    roomsOf (TaskPool _ _ m) = [m]

instance RoomContext (TaskPool () u) where
    type Base (TaskPool () u) = Schedule () (DefaultRoomContext (TaskPoolConstraint u),Room (TaskPoolConstraint u))
    baseContext tp = schedule tp ()

-- | A 'RoomContext' for a task pool.
schedule :: TaskPool p u -> p -> (Schedule p (DefaultRoomContext (TaskPoolConstraint u),Room (TaskPoolConstraint u)))
schedule (TaskPool _ q m) p = Schedule q p (Default,m)

-- | Create a new 'TaskPool'.  'TaskPool's begin stopped, use 'startQueue' to start.
--
-- * A 'QueueConfigurationRecord' for the backing 'Queue'.  A typical value is 'simple_queue_configuration' or 'fast_queue_configuration'.
--
-- * The user data for the backing 'Room'.  A typical value is @'MaxThreads' 'GHC.Conc.numCapabilities'@.
--
-- Consider using 'simpleTaskPool' if you have no special needs.
--
newTaskPool :: (Ord p) => QueueConfigurationRecord p -> Int -> u -> IO (TaskPool p u)
newTaskPool config n u =
    do on <- newTVarIO False 
       m <- newRoom $ (Just $ MaxThreads n,u)
       q <- newQueue $ config { queue_predicate = (flip when retry . not =<< readTVar on) >> 
                                                  (flip when retry . (>= n) . Set.size =<< inUse m) >> 
                                                  queue_predicate config }
       return $ TaskPool on q m

-- | Just create a new 'TaskPool'.  The task pool is constrained by the number of capabilities indicated by 'GHC.Conc.numCapabilities'.
simpleTaskPool :: (Ord p) => IO (TaskPool p ())
simpleTaskPool = newTaskPool fair_queue_configuration numCapabilities ()

poolRoom :: TaskPool p u -> Room (TaskPoolConstraint u)
poolRoom = pool_room

poolQueue :: TaskPool p u -> Queue p
poolQueue = pool_queue

startQueue :: TaskPool p u -> IO ()
startQueue tp = atomically $ writeTVar (pool_on tp) True

stopQueue :: TaskPool p u -> IO ()
stopQueue tp = atomically $ writeTVar (pool_on tp) False

-- | The number of threads participating in this 'ThreadPool'.
activity :: (Ord p) => TaskPool p u -> STM Int
activity tp = liftM2 (+) (load $ poolQueue tp) (liftM size $ inUse $ poolRoom tp)