{-# LANGUAGE TypeFamilies, FlexibleInstances #-} -- | A prioritized TaskPool. This consists of a 'Queue', which prioritizes tasks, and a 'Room' which restricts the number of tasks that may execute at one time. module Control.Concurrent.Priority.TaskPool (TaskPool, Control.Concurrent.Priority.TaskPool.schedule, newTaskPool, simpleTaskPool, poolRoom, poolQueue, startQueue, stopQueue, activity) where import Control.Concurrent.Priority.Room import Control.Concurrent.Priority.Queue import Control.Concurrent.Priority.Schedule import Control.Monad import Data.Set as Set import GHC.Conc data TaskPool p u = TaskPool { pool_on :: TVar Bool, pool_queue :: Queue p, pool_room :: Room (TaskPoolConstraint u) } type TaskPoolConstraint u = (Maybe MaxThreads, u) type instance UserData (TaskPool p u) = TaskPoolConstraint u instance RoomGroup (TaskPool p u) where roomsOf (TaskPool _ _ m) = [m] instance RoomContext (TaskPool () u) where type Base (TaskPool () u) = Schedule () (DefaultRoomContext (TaskPoolConstraint u),Room (TaskPoolConstraint u)) baseContext tp = schedule tp () -- | A 'RoomContext' for a task pool. schedule :: TaskPool p u -> p -> (Schedule p (DefaultRoomContext (TaskPoolConstraint u),Room (TaskPoolConstraint u))) schedule (TaskPool _ q m) p = Schedule q p (Default,m) -- | Create a new 'TaskPool'. 'TaskPool's begin stopped, use 'startQueue' to start. -- -- * A 'QueueConfigurationRecord' for the backing 'Queue'. A typical value is 'simple_queue_configuration' or 'fast_queue_configuration'. -- -- * The user data for the backing 'Room'. A typical value is @'MaxThreads' 'GHC.Conc.numCapabilities'@. -- -- Consider using 'simpleTaskPool' if you have no special needs. -- newTaskPool :: (Ord p) => QueueConfigurationRecord p -> Int -> u -> IO (TaskPool p u) newTaskPool config n u = do on <- newTVarIO False m <- newRoom $ (Just $ MaxThreads n,u) q <- newQueue $ config { queue_predicate = (flip when retry . not =<< readTVar on) >> (flip when retry . (>= n) . Set.size =<< inUse m) >> queue_predicate config } return $ TaskPool on q m -- | Just create a new 'TaskPool'. The task pool is constrained by the number of capabilities indicated by 'GHC.Conc.numCapabilities'. simpleTaskPool :: (Ord p) => IO (TaskPool p ()) simpleTaskPool = newTaskPool fair_queue_configuration numCapabilities () poolRoom :: TaskPool p u -> Room (TaskPoolConstraint u) poolRoom = pool_room poolQueue :: TaskPool p u -> Queue p poolQueue = pool_queue startQueue :: TaskPool p u -> IO () startQueue tp = atomically $ writeTVar (pool_on tp) True stopQueue :: TaskPool p u -> IO () stopQueue tp = atomically $ writeTVar (pool_on tp) False -- | The number of threads participating in this 'ThreadPool'. activity :: (Ord p) => TaskPool p u -> STM Int activity tp = liftM2 (+) (load $ poolQueue tp) (liftM size $ inUse $ poolRoom tp)