module Holumbus.Distribution.Master.MasterData
(
MasterData
, newMaster
)
where
import Control.Concurrent
import Data.List
import qualified Data.Set as Set
import System.Log.Logger
import Holumbus.Common.Debug
import Holumbus.Common.Utils
import Holumbus.Network.Site
import Holumbus.Network.Communication
import Holumbus.Distribution.Master.MasterState
import qualified Holumbus.MapReduce.MapReduce as MR
import Holumbus.MapReduce.JobController
import Holumbus.MapReduce.Types
import qualified Holumbus.Data.MultiMap as MMap
import qualified Holumbus.FileSystem.FileSystem as FS
import qualified Holumbus.Distribution.Messages as M
import qualified Holumbus.Distribution.Master as MC
import qualified Holumbus.Distribution.Worker as WC
import qualified Holumbus.Distribution.Worker.WorkerPort as WP
import Holumbus.Distribution.Master
localLogger :: String
localLogger = "Holumbus.Distribution.Master.MasterData"
type TaskToWorkerMap = MMap.MultiMap TaskId M.WorkerId
type WorkerToTaskMap = MMap.MultiMap M.WorkerId TaskId
type ActionToWorkerMap = MMap.MultiMap ActionName M.WorkerId
data WorkerControllerData = WorkerControllerData {
wcd_TaskToWorkerMap :: ! TaskToWorkerMap
, wcd_WorkerToTaskMap :: ! WorkerToTaskMap
, wcd_ActionToWorkerMap :: ! ActionToWorkerMap
}
type WorkerController = MVar WorkerControllerData
data MasterData = MasterData {
md_Server :: Server
, md_WorkerController :: WorkerController
, md_JobController :: JobController
, md_FileSystem :: FS.FileSystem
}
newWorkerController :: IO WorkerController
newWorkerController
= do
let wcd = WorkerControllerData
MMap.empty
MMap.empty
MMap.empty
newMVar wcd
newMaster
:: FS.FileSystem
-> Bool -> StreamName -> Maybe PortNumber
-> IO MasterData
newMaster fs start sn pn
= do
m <- newEmptyMVar
server <- newServer sn pn (dispatch m) (Just $ registerWorker m) (Just $ unregisterWorker m)
wc <- newWorkerController
jc <- newJobController
setTaskSendHook (sendStartTask server wc) jc
if (start)
then do startJobController jc
else do return ()
let md = MasterData server wc jc fs
putMVar m md
return md
dispatch
:: MVar MasterData
-> M.MasterRequestMessage
-> IO (Maybe M.MasterResponseMessage)
dispatch m msg
= do
md <- readMVar m
case msg of
(M.MReqTaskCompleted td) ->
do
_ <- MC.receiveTaskCompleted td md
return $ Just $ M.MRspSuccess
(M.MReqTaskError td) ->
do
_ <- MC.receiveTaskError td md
return $ Just $ M.MRspSuccess
(M.MReqStartControlling) ->
do
MR.startControlling md
return $ Just $ M.MRspSuccess
(M.MReqStopControlling) ->
do
MR.stopControlling md
return $ Just $ M.MRspSuccess
(M.MReqIsControlling) ->
do
_ <- MR.isControlling md
return $ Just $ M.MRspSuccess
(M.MReqSingleStep) ->
do
MR.doSingleStep md
return $ Just $ M.MRspSuccess
(M.MReqPerformJob ji) ->
do
r <- MR.doMapReduceJob ji md
return $ Just $ M.MRspResult r
_ -> return Nothing
registerWorker :: MVar MasterData -> IdType -> ClientPort -> IO ()
registerWorker m i cp
= do
let wp = WP.newWorkerPort cp
as <- WC.getActionNames wp
md <- readMVar m
addWorker i (as,wp)
modifyMVar (md_WorkerController md) $
\wcd ->
do
let wcd' = addWorkerToMaster i as wcd
return (wcd',())
unregisterWorker :: MVar MasterData -> IdType -> ClientPort -> IO ()
unregisterWorker m i _
= do
md <- readMVar m
delWorker i
modifyMVar (md_WorkerController md) $
\wcd ->
do
let wcd' = deleteWorkerFromMaster i wcd
return (wcd',())
return ()
addWorkerToMaster
:: M.WorkerId -> [ActionName]
-> WorkerControllerData -> WorkerControllerData
addWorkerToMaster wid as wcd
= wcd { wcd_ActionToWorkerMap = awm' }
where
awm' = MMap.insertKeys as (Set.singleton wid) $ wcd_ActionToWorkerMap wcd
deleteWorkerFromMaster :: M.WorkerId -> WorkerControllerData -> WorkerControllerData
deleteWorkerFromMaster wid wcd
= wcd { wcd_ActionToWorkerMap = awm' }
where
awm' = MMap.deleteAllElems wid $ wcd_ActionToWorkerMap wcd
getTasksPerWorkerWithAction :: ActionName -> WorkerControllerData -> [(Int, M.WorkerId)]
getTasksPerWorkerWithAction a wcd = nullList ++ sortedList
where
wtm = wcd_WorkerToTaskMap wcd
allWids = MMap.lookup a $ wcd_ActionToWorkerMap wcd
taskWids = Set.intersection allWids $ MMap.keys wtm
noTaskWids = Set.difference allWids taskWids
nullList = map (\wid -> (0,wid)) (Set.toList noTaskWids)
ls2 = map (\(wid,s) -> (Set.size s, wid)) (MMap.toList wtm)
sortedList = sortBy (\(n1,_) (n2,_) -> compare n1 n2) ls2
addTaskToWorker :: TaskId -> M.WorkerId -> WorkerControllerData -> WorkerControllerData
addTaskToWorker tid wid wcd = wcd { wcd_TaskToWorkerMap = twm', wcd_WorkerToTaskMap = wtm'}
where
twm' = MMap.insert tid wid (wcd_TaskToWorkerMap wcd)
wtm' = MMap.insert wid tid (wcd_WorkerToTaskMap wcd)
deleteTaskFromWorker :: TaskId -> M.WorkerId -> WorkerControllerData -> WorkerControllerData
deleteTaskFromWorker tid wid wcd = wcd { wcd_TaskToWorkerMap = twm', wcd_WorkerToTaskMap = wtm'}
where
twm' = MMap.deleteElem tid wid (wcd_TaskToWorkerMap wcd)
wtm' = MMap.deleteElem wid tid (wcd_WorkerToTaskMap wcd)
deleteTaskFromWorkers :: TaskId -> WorkerControllerData -> WorkerControllerData
deleteTaskFromWorkers tid wcd = wcd''
where
wids = Set.toList $ MMap.lookup tid (wcd_TaskToWorkerMap wcd)
wcd'' = foldl (\wcd' wid -> deleteTaskFromWorker tid wid wcd') wcd wids
sendStartTask :: Server -> WorkerController -> TaskData -> IO (TaskSendResult)
sendStartTask s wc td
= handleAll ( \ e -> do
errorM localLogger $ "sendStartTask: " ++ show e
return TSRError
) $
do
debugM localLogger $ "sendStartTask: waiting for wc"
modifyMVar wc $
\wcd ->
do
debugM localLogger $ "sendStartTask: waiting for wc -> done"
let wls = getTasksPerWorkerWithAction (td_Action td) wcd
if (null wls)
then do
warningM localLogger $ "sendStartTask: no worker with action \"" ++ (td_Action td) ++ "\"found"
return (wcd, TSRNotSend)
else do
infoM localLogger $ "starting Task: " ++ show (td_TaskId td)
debugM localLogger $ "sendStartTask: wls:" ++ show wls
let (_,wid) = head wls
info <- getClientInfo wid s
case info of
(Just ci) ->
do
_ <- WC.startTask td $ WP.newWorkerPort (ci_Port ci)
let wcd' = addTaskToWorker (td_TaskId td) wid wcd
return (wcd', TSRSend)
(Nothing) ->
return (wcd, TSRNotSend)
instance MR.MapReduce MasterData where
closeMapReduce md
= closeMaster md
getMySiteId _
= getSiteId
getMapReduceType _
= return MR.MRTMaster
startControlling md
= do
debugM localLogger "startControlling"
startJobController (md_JobController md)
stopControlling md
= do
debugM localLogger "stopControlling"
stopJobController (md_JobController md)
isControlling md
= do
debugM localLogger "isControlling"
isJobControllerRunning (md_JobController md)
doSingleStep md
= do
debugM localLogger "doSingleStep"
singleStepJobControlling (md_JobController md)
doMapReduceJob ji md
= do
debugM localLogger "doMapReduceJob"
performJob ji (md_JobController md)
instance MasterClass MasterData where
closeMaster md
= do
closeServer (md_Server md)
return ()
receiveTaskCompleted td md
= do
debugM localLogger $ "completed Task: " ++ show (td_TaskId td)
setTaskCompleted (md_JobController md) td
return md
receiveTaskError td md
= do
debugM localLogger $ "error Task: " ++ show (td_TaskId td)
setTaskError (md_JobController md) td
return md
instance Debug MasterData where
printDebug md
= do
putStrLn "Master-Object (full)"
putStrLn "--------------------------------------------------------"
putStrLn "Server"
printDebug (md_Server md)
withMVar (md_WorkerController md) $
\wcd ->
do
putStrLn $ prettyRecordLine gap "TaskToWorkerMap: " (wcd_TaskToWorkerMap wcd)
putStrLn $ prettyRecordLine gap "WorkerToTaskMap" (wcd_WorkerToTaskMap wcd)
putStrLn $ prettyRecordLine gap "ActionToWorkerMap:" (wcd_ActionToWorkerMap wcd)
putStrLn $ "JobController:"
jc <- printJobController (md_JobController md)
putStrLn jc
where
gap = 20
getDebug md
= do
mddebug <- getDebug (md_Server md)
tmp <- (withMVar (md_WorkerController md) $
\wcd ->
do
jc <- printJobController (md_JobController md)
return (
prettyRecordLine gap "TaskToWorkerMap: " (wcd_TaskToWorkerMap wcd)
++"\n"++ prettyRecordLine gap "WorkerToTaskMap" (wcd_WorkerToTaskMap wcd)
++"\n"++ prettyRecordLine gap "ActionToWorkerMap:" (wcd_ActionToWorkerMap wcd)
++"\n"++ "JobController:"
++"\n"++ jc ++"\n"))
return (
"Master-Object (full)"
++"\n"++ "--------------------------------------------------------"
++"\n"++ "Server"
++ mddebug
++ tmp)
where
gap = 20