module PrioritySync.Internal.TaskPool
(TaskPool,
PrioritySync.Internal.TaskPool.schedule,
newTaskPool,
simpleTaskPool,
poolRoom,
poolQueue,
startQueue,
stopQueue,
PrioritySync.Internal.TaskPool.isEmpty,
waitUntilFinished)
where
import PrioritySync.Internal.Room as Room
import PrioritySync.Internal.Queue as Queue
import PrioritySync.Internal.Schedule
import PrioritySync.Internal.UserData
import PrioritySync.Internal.RoomGroup
import PrioritySync.Internal.RoomConstraint
import PrioritySync.Internal.Constrained
import Control.Monad
import Data.Set as Set
import GHC.Conc
data TaskPool p u = TaskPool {
pool_on :: TVar Bool,
pool_queue :: Queue p,
pool_room :: Room (TaskPoolConstraint u) }
type TaskPoolConstraint u = (Maybe MaxThreads, u)
type instance UserData (TaskPool p u) = TaskPoolConstraint u
instance RoomGroup (TaskPool p u) where
roomsOf (TaskPool _ _ m) = [m]
schedule :: TaskPool p u -> p -> (Schedule p (Constrained (TaskPoolConstraint u),Room (TaskPoolConstraint u)))
schedule (TaskPool _ q m) p = Schedule q p (Constrained,m)
newTaskPool :: (Ord p) => QueueConfigurationRecord p -> Int -> u -> IO (TaskPool p u)
newTaskPool config n u =
do on <- newTVarIO False
m <- newRoom $ (Just $ MaxThreads n,u)
q <- newQueue $ config { queue_predicate = (flip when retry . not =<< readTVar on) >>
(flip when retry . (>= n) . Set.size =<< inUse m) >>
queue_predicate config }
return $ TaskPool on q m
simpleTaskPool :: (Ord p) => IO (TaskPool p ())
simpleTaskPool = newTaskPool fair_queue_configuration numCapabilities ()
poolRoom :: TaskPool p u -> Room (TaskPoolConstraint u)
poolRoom = pool_room
poolQueue :: TaskPool p u -> Queue p
poolQueue = pool_queue
startQueue :: TaskPool p u -> STM ()
startQueue tp = writeTVar (pool_on tp) True
stopQueue :: TaskPool p u -> STM ()
stopQueue tp = writeTVar (pool_on tp) False
isEmpty :: (Ord p) => TaskPool p u -> STM Bool
isEmpty tp = liftM2 (&&) (Queue.isEmpty $ poolQueue tp) (Room.isEmpty $ poolRoom tp)
waitUntilFinished :: (Ord p) => TaskPool p u -> IO ()
waitUntilFinished tp = atomically $ flip unless retry =<< PrioritySync.Internal.TaskPool.isEmpty tp