module Control.Concurrent.Priority.TaskPool
(TaskPool,
Control.Concurrent.Priority.TaskPool.schedule,
newTaskPool,
simpleTaskPool,
poolRoom,
poolQueue,
startQueue,
stopQueue,
activity)
where
import Control.Concurrent.Priority.Room
import Control.Concurrent.Priority.Queue
import Control.Concurrent.Priority.Schedule
import Control.Monad
import Data.Set as Set
import GHC.Conc
data TaskPool p u = TaskPool {
pool_on :: TVar Bool,
pool_queue :: Queue p,
pool_room :: Room (TaskPoolConstraint u) }
type TaskPoolConstraint u = (Maybe MaxThreads, u)
type instance UserData (TaskPool p u) = TaskPoolConstraint u
instance RoomGroup (TaskPool p u) where
roomsOf (TaskPool _ _ m) = [m]
instance RoomContext (TaskPool () u) where
type Base (TaskPool () u) = Schedule () (DefaultRoomContext (TaskPoolConstraint u),Room (TaskPoolConstraint u))
baseContext tp = schedule tp ()
schedule :: TaskPool p u -> p -> (Schedule p (DefaultRoomContext (TaskPoolConstraint u),Room (TaskPoolConstraint u)))
schedule (TaskPool _ q m) p = Schedule q p (Default,m)
newTaskPool :: (Ord p) => QueueConfigurationRecord p -> Int -> u -> IO (TaskPool p u)
newTaskPool config n u =
do on <- newTVarIO False
m <- newRoom $ (Just $ MaxThreads n,u)
q <- newQueue $ config { queue_predicate = (flip when retry . not =<< readTVar on) >>
(flip when retry . (>= n) . Set.size =<< inUse m) >>
queue_predicate config }
return $ TaskPool on q m
simpleTaskPool :: (Ord p) => IO (TaskPool p ())
simpleTaskPool = newTaskPool fair_queue_configuration numCapabilities ()
poolRoom :: TaskPool p u -> Room (TaskPoolConstraint u)
poolRoom = pool_room
poolQueue :: TaskPool p u -> Queue p
poolQueue = pool_queue
startQueue :: TaskPool p u -> IO ()
startQueue tp = atomically $ writeTVar (pool_on tp) True
stopQueue :: TaskPool p u -> IO ()
stopQueue tp = atomically $ writeTVar (pool_on tp) False
activity :: (Ord p) => TaskPool p u -> STM Int
activity tp = liftM2 (+) (load $ poolQueue tp) (liftM size $ inUse $ poolRoom tp)