module Control.Parallel.HdpH.Internal.Scheduler
(
RTS,
run_,
liftThreadM,
liftSparkM,
liftCommM,
liftIO,
schedulerID,
mkThread,
execThread,
sendPUSH
) where
import Prelude hiding (error)
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Monad (unless, replicateM)
import Data.Functor ((<$>))
import Control.Parallel.HdpH.Closure (unClosure)
import Control.Parallel.HdpH.Conf (RTSConf(scheds, wakeupDly))
import Control.Parallel.HdpH.Internal.Comm (CommM)
import qualified Control.Parallel.HdpH.Internal.Comm as Comm
(myNode, send, receive, run_, waitShutdown)
import qualified Control.Parallel.HdpH.Internal.Data.Deque as Deque (emptyIO)
import qualified Control.Parallel.HdpH.Internal.Data.Sem as Sem
(new, signalPeriodically)
import Control.Parallel.HdpH.Internal.Location
(NodeId, dbgNone, dbgStats, dbgMsgSend, dbgMsgRcvd, error)
import qualified Control.Parallel.HdpH.Internal.Location as Location (debug)
import Control.Parallel.HdpH.Internal.Misc
(encodeLazy, decodeLazy, ActionServer, newServer, killServer)
import Control.Parallel.HdpH.Internal.Sparkpool
(SparkM, blockSched, getSpark, Msg(PUSH), dispatch, readPoolSize,
readFishSentCtr, readSparkRcvdCtr, readSparkGenCtr, readMaxSparkCtr)
import qualified Control.Parallel.HdpH.Internal.Sparkpool as Sparkpool (run)
import Control.Parallel.HdpH.Internal.Threadpool
(ThreadM, poolID, forkThreadM, stealThread, readMaxThreadCtrs)
import qualified Control.Parallel.HdpH.Internal.Threadpool as Threadpool
(run, liftSparkM, liftCommM, liftIO)
import Control.Parallel.HdpH.Internal.Type.Par
(ParM, unPar, Thread(Atom), Spark)
newtype RTS a = RTS { unRTS :: ThreadM RTS a }
deriving (Functor, Monad)
forkRTS :: Int -> RTS () -> RTS ThreadId
forkRTS n = liftThreadM . forkThreadM n . unRTS
run_ :: RTSConf -> RTS () -> IO ()
run_ conf main = do
let n = scheds conf
unless (n > 0) $
error "HdpH.Internal.Scheduler.run_: no schedulers"
pools <- mapM (\ k -> do { pool <- Deque.emptyIO; return (k,pool) }) [0 .. n]
noWorkServer <- newServer
idleSem <- Sem.new
wakeupServerTid <- forkIO $ Sem.signalPeriodically idleSem (wakeupDly conf)
Comm.run_ conf $
Sparkpool.run conf noWorkServer idleSem $
Threadpool.run pools $
unRTS $
rts n noWorkServer wakeupServerTid
where
rts :: Int -> ActionServer -> ThreadId -> RTS ()
rts scheds noWorkServer wakeupServerTid = do
handlerTid <- forkRTS 0 handler
schedulerTids <- mapM (\ k -> forkRTS k scheduler) [1 .. scheds]
main
liftCommM $ Comm.waitShutdown
printFinalStats
liftIO $ killServer noWorkServer
liftIO $ killThread wakeupServerTid
liftIO $ killThread handlerTid
liftIO $ mapM_ killThread schedulerTids
liftThreadM :: ThreadM RTS a -> RTS a
liftThreadM = RTS
liftSparkM :: SparkM RTS a -> RTS a
liftSparkM = liftThreadM . Threadpool.liftSparkM
liftCommM :: CommM a -> RTS a
liftCommM = liftThreadM . Threadpool.liftCommM
liftIO :: IO a -> RTS a
liftIO = liftThreadM . Threadpool.liftIO
schedulerID :: RTS Int
schedulerID = liftThreadM poolID
execThread :: Thread RTS -> RTS ()
execThread (Atom m) = m >>= maybe (return ()) execThread
scheduler :: RTS ()
scheduler = getThread >>= scheduleThread
scheduleThread :: Thread RTS -> RTS ()
scheduleThread (Atom m) = m >>= maybe scheduler scheduleThread
getThread :: RTS (Thread RTS)
getThread = do
schedID <- schedulerID
maybe_thread <- liftThreadM stealThread
case maybe_thread of
Just thread -> return thread
Nothing -> do
maybe_spark <- liftSparkM $ getSpark schedID
case maybe_spark of
Just spark -> return $ mkThread $ unClosure spark
Nothing -> liftSparkM blockSched >> getThread
mkThread :: ParM RTS a -> Thread RTS
mkThread p = unPar p $ \ _c -> Atom (return Nothing)
sendPUSH :: Spark RTS -> NodeId -> RTS ()
sendPUSH spark target = do
here <- liftCommM Comm.myNode
if target == here
then do
execSpark spark
else do
let msg = PUSH spark :: Msg RTS
debug dbgMsgSend $
show msg ++ " ->> " ++ show target
liftCommM $ Comm.send target $ encodeLazy msg
handlePUSH :: Msg RTS -> RTS ()
handlePUSH (PUSH spark) = execSpark spark
execSpark :: Spark RTS -> RTS ()
execSpark spark = execThread $ mkThread $ unClosure spark
handler :: RTS ()
handler = do
msg <- decodeLazy <$> liftCommM Comm.receive
sparks <- liftSparkM readPoolSize
debug dbgMsgRcvd $
">> " ++ show msg ++ " #sparks=" ++ show sparks
case msg of
PUSH _ -> handlePUSH msg
_ -> liftSparkM $ dispatch msg
handler
printFinalStats :: RTS ()
printFinalStats = do
fishes <- liftSparkM $ readFishSentCtr
scheds <- liftSparkM $ readSparkRcvdCtr
sparks <- liftSparkM $ readSparkGenCtr
max_sparks <- liftSparkM $ readMaxSparkCtr
maxs_threads <- liftThreadM $ readMaxThreadCtrs
debug dbgStats $ "#SPARK=" ++ show sparks ++ " " ++
"max_SPARK=" ++ show max_sparks ++ " " ++
"max_THREAD=" ++ show maxs_threads
debug dbgStats $ "#FISH_sent=" ++ show fishes ++ " " ++
"#SCHED_rcvd=" ++ show scheds
debug :: Int -> String -> RTS ()
debug level message = liftIO $ Location.debug level message