{-# LANGUAGE TypeFamilies, FlexibleInstances #-} -- | A prioritized TaskPool. This consists of a 'Queue', which prioritizes tasks, and a 'Room' which restricts the number of tasks that may execute at one time. module PrioritySync.Internal.TaskPool (TaskPool, PrioritySync.Internal.TaskPool.schedule, newTaskPool, simpleTaskPool, poolRoom, poolQueue, startQueue, stopQueue, PrioritySync.Internal.TaskPool.isEmpty, waitUntilFinished) where import PrioritySync.Internal.Room as Room import PrioritySync.Internal.Queue as Queue import PrioritySync.Internal.Schedule import PrioritySync.Internal.UserData import PrioritySync.Internal.RoomGroup import PrioritySync.Internal.RoomConstraint import PrioritySync.Internal.Constrained import Control.Monad import Data.Set as Set import GHC.Conc data TaskPool p u = TaskPool { pool_on :: TVar Bool, pool_queue :: Queue p, pool_room :: Room (TaskPoolConstraint u) } type TaskPoolConstraint u = (Maybe MaxThreads, u) type instance UserData (TaskPool p u) = TaskPoolConstraint u instance RoomGroup (TaskPool p u) where roomsOf (TaskPool _ _ m) = [m] -- | A prioritized 'ClaimContext' for a task pool. schedule :: TaskPool p u -> p -> (Schedule p (Constrained (TaskPoolConstraint u),Room (TaskPoolConstraint u))) schedule (TaskPool _ q m) p = Schedule q p (Constrained,m) -- | Create a new 'TaskPool'. 'TaskPool's begin stopped, use 'startQueue' to start. -- -- * A 'QueueConfigurationRecord' for the backing 'Queue'. A typical value is 'fair_queue_configuration'. -- -- * The allowed number of threads that can access the 'TaskPool' simultaneously. -- -- * The user data for the backing 'Room'. This can be @()@. -- -- Consider using 'simpleTaskPool' if you have no special needs. -- newTaskPool :: (Ord p) => QueueConfigurationRecord p -> Int -> u -> IO (TaskPool p u) newTaskPool config n u = do on <- newTVarIO False m <- newRoom $ (Just $ MaxThreads n,u) q <- newQueue $ config { queue_predicate = (flip when retry . not =<< readTVar on) >> (flip when retry . (>= n) . Set.size =<< inUse m) >> queue_predicate config } return $ TaskPool on q m -- | Just create a new 'TaskPool'. The task pool is constrained by the number of capabilities indicated by 'GHC.Conc.numCapabilities'. simpleTaskPool :: (Ord p) => IO (TaskPool p ()) simpleTaskPool = newTaskPool fair_queue_configuration numCapabilities () -- | Get the 'Room' that primarily constrains this 'TaskPool'. poolRoom :: TaskPool p u -> Room (TaskPoolConstraint u) poolRoom = pool_room -- | Get the 'Queue' that admits new tasks to this 'TaskPool'. poolQueue :: TaskPool p u -> Queue p poolQueue = pool_queue -- | Start the 'TaskPool'. startQueue :: TaskPool p u -> STM () startQueue tp = writeTVar (pool_on tp) True -- | Stop all activity on this 'TaskPool'. stopQueue :: TaskPool p u -> STM () stopQueue tp = writeTVar (pool_on tp) False -- | True iff this 'TaskPool' is entirely empty and inactive. isEmpty :: (Ord p) => TaskPool p u -> STM Bool isEmpty tp = liftM2 (&&) (Queue.isEmpty $ poolQueue tp) (Room.isEmpty $ poolRoom tp) -- | Wait until a queue is finished. waitUntilFinished :: (Ord p) => TaskPool p u -> IO () waitUntilFinished tp = atomically $ flip unless retry =<< PrioritySync.Internal.TaskPool.isEmpty tp