module Holumbus.Standalone.SMapReduce
(
SMapReduce
, MapReduce(..)
, SMRConf
, defaultStandaloneConfig
, newSMapReduce
)
where
import Control.Concurrent
import System.Log.Logger
import Holumbus.Common.Debug
import qualified Holumbus.FileSystem.FileSystem as FS
import Holumbus.MapReduce.Types
import Holumbus.MapReduce.JobController
import Holumbus.MapReduce.TaskProcessor
import Holumbus.MapReduce.MapReduce
import Holumbus.Network.Site
localLogger :: String
localLogger = "Holumbus.Standalone.SMapReduce"
data SMapReduceData = SMapReduceData {
sad_JobController :: JobController
, sad_TaskProcessor :: TaskProcessor
}
data SMapReduce = SMapReduce (MVar SMapReduceData)
instance Show SMapReduce where
show _ = "SMapReduce"
data SMRConf = SMRConf {
stc_StartControlling :: Bool
}
defaultStandaloneConfig :: SMRConf
defaultStandaloneConfig = SMRConf True
sendStartTask :: TaskProcessor -> TaskData -> IO (TaskSendResult)
sendStartTask tp td
= do
debugM localLogger "sendStartTask"
debugM localLogger "TaskData:"
debugM localLogger $ show td
startTask td tp
return TSRSend
sendTaskCompleted :: JobController -> TaskData -> IO Bool
sendTaskCompleted jc td
= do
debugM localLogger "Task completed"
debugM localLogger "TaskData:"
debugM localLogger $ show td
setTaskCompleted jc td
return True
sendTaskError :: JobController -> TaskData -> IO Bool
sendTaskError jc td
= do
debugM localLogger "Task error"
debugM localLogger "TaskData:"
debugM localLogger $ show td
setTaskError jc td
return True
newSMapReduce
:: FS.FileSystem -> ActionMap -> SMRConf
-> IO SMapReduce
newSMapReduce fs am conf
= do
let start = stc_StartControlling conf
jc <- newJobController
tp <- newTaskProcessor
setTaskSendHook (sendStartTask tp) jc
setFileSystemToTaskProcessor fs tp
setActionMap am tp
setTaskCompletedHook (sendTaskCompleted jc) tp
setTaskErrorHook (sendTaskError jc) tp
if (start)
then do startJobController jc
else do return ()
startTaskProcessor tp
let sad = SMapReduceData jc tp
sa <- (newMVar sad)
return (SMapReduce sa)
instance Debug SMapReduce where
printDebug (SMapReduce sa)
= do
withMVar sa $
\sad ->
do
putStrLn "--------------------------------------------------------"
putStrLn "Job-Controller"
showJC <- printJobController (sad_JobController sad)
putStrLn showJC
putStrLn "--------------------------------------------------------"
putStrLn "Task-Processor"
showTP <- printTaskProcessor (sad_TaskProcessor sad)
putStrLn $showTP
putStrLn "--------------------------------------------------------"
putStrLn "Actions"
actions <- getActions (sad_TaskProcessor sad)
putStrLn $ show $ actions
instance MapReduce SMapReduce where
closeMapReduce (SMapReduce sa)
= modifyMVar sa $
\sad ->
do
closeTaskProcessor (sad_TaskProcessor sad)
closeJobController (sad_JobController sad)
return (sad, ())
getMySiteId _
= getSiteId
getMapReduceType _
= return MRTStandalone
startControlling (SMapReduce sa)
= withMVar sa $
\sad -> do startJobController (sad_JobController sad)
stopControlling (SMapReduce sa)
= withMVar sa $
\sad -> do stopJobController (sad_JobController sad)
isControlling (SMapReduce sa)
= withMVar sa $
\sad -> do isJobControllerRunning (sad_JobController sad)
doSingleStep (SMapReduce sa)
= withMVar sa $
\sad -> do singleStepJobControlling (sad_JobController sad)
doMapReduceJob ji (SMapReduce sa)
= withMVar sa $
\sad -> do performJob ji (sad_JobController sad)