module Holumbus.Distribution.DMapReduce
(
DMapReduce
, MapReduce(..)
, DMRMasterConf(..)
, defaultMRMasterConfig
, DMRWorkerConf(..)
, defaultMRWorkerConfig
, DMRClientConf(..)
, defaultMRClientConfig
, mkMapReduceMaster
, mkMapReduceWorker
, mkMapReduceClient
)
where
import Control.Concurrent
import Network
import System.Log.Logger
import Holumbus.Common.Debug
import Holumbus.MapReduce.Types
import Holumbus.MapReduce.MapReduce
import qualified Holumbus.Distribution.Master as M
import qualified Holumbus.Distribution.Master.MasterData as MD
import qualified Holumbus.Distribution.Master.MasterPort as MP
import qualified Holumbus.Distribution.Worker as W
import qualified Holumbus.Distribution.Worker.WorkerData as WD
import qualified Holumbus.Distribution.Worker.WorkerPort as WP
import qualified Holumbus.FileSystem.FileSystem as FS
import Holumbus.Network.Site
import Holumbus.Network.Port
localLogger :: String
localLogger = "Holumbus.Distribution.DMapReduce"
data DMapReduceData =
forall m w. (M.MasterClass m, W.WorkerClass w, Debug m, Debug w) =>
DMapReduceData SiteId MapReduceType m (Maybe w)
data DMapReduce = DMapReduce (MVar DMapReduceData)
instance Show DMapReduce where
show _ = "DMapReduce"
data DMRMasterConf = DMRMasterConf {
msc_StartControlling :: Bool
, msc_StreamName :: StreamName
, msc_PortNumber :: Maybe PortNumber
}
defaultMRMasterConfig :: DMRMasterConf
defaultMRMasterConfig = DMRMasterConf True "MRMaster" Nothing
data DMRWorkerConf = DMRWorkerConf {
woc_StreamName :: StreamName
, woc_SocketId :: Maybe SocketId
}
defaultMRWorkerConfig :: DMRWorkerConf
defaultMRWorkerConfig = DMRWorkerConf "MRMaster" Nothing
data DMRClientConf = DMRClientConf {
clc_StreamName :: StreamName
, clc_SocketId :: Maybe SocketId
}
defaultMRClientConfig :: DMRClientConf
defaultMRClientConfig = DMRClientConf "MRMaster" Nothing
mkMapReduceMaster
:: FS.FileSystem -> DMRMasterConf
-> IO DMapReduce
mkMapReduceMaster fs conf
= do
sid <- getSiteId
infoM localLogger $ "initialising master on site " ++ show sid
md <- MD.newMaster fs (msc_StartControlling conf) (msc_StreamName conf) (msc_PortNumber conf)
newDMapReduce MRTMaster md (Nothing::Maybe WP.WorkerPort)
mkMapReduceWorker
:: FS.FileSystem -> ActionMap -> DMRWorkerConf
-> IO DMapReduce
mkMapReduceWorker fs am conf
= do
sid <- getSiteId
infoM localLogger $ "initialising worker on site " ++ show sid
mp <- MP.newMasterPort (woc_StreamName conf) (woc_SocketId conf)
wd <- WD.newWorker fs am (woc_StreamName conf) (woc_SocketId conf)
newDMapReduce MRTWorker mp (Just wd)
mkMapReduceClient
:: DMRClientConf
-> IO DMapReduce
mkMapReduceClient conf
= do
sid <- getSiteId
infoM localLogger $ "initialising map-reduce-client on site " ++ show sid
mp <- MP.newMasterPort (clc_StreamName conf) (clc_SocketId conf)
newDMapReduce MRTClient mp (Nothing::Maybe WP.WorkerPort)
newDMapReduce
:: (M.MasterClass m, W.WorkerClass w, Debug m, Debug w)
=> MapReduceType -> m -> Maybe w -> IO DMapReduce
newDMapReduce t m w
= do
sid <- getSiteId
d <- newMVar (DMapReduceData sid t m w)
return $ DMapReduce d
instance Debug DMapReduce where
printDebug (DMapReduce mr)
= withMVar mr $
\(DMapReduceData s t m w) ->
do
putStrLn "--------------------------------------------------------"
putStrLn "Distribtion - internal data\n"
putStrLn "--------------------------------------------------------"
putStrLn "SiteId:"
putStrLn $ show s
putStrLn "Type:"
putStrLn $ show t
putStrLn "--------------------------------------------------------"
putStrLn "Master:"
printDebug m
putStrLn "--------------------------------------------------------"
putStrLn "Worker:"
maybe (putStrLn "NOTHING") (\w' -> printDebug w') w
putStrLn "--------------------------------------------------------"
getDebug (DMapReduce mr)
= withMVar mr $
\(DMapReduceData s t m w) ->
do
let line = "--------------------------------------------------------"
tmp <- getDebug m
mtmp <- maybe (return "NOTHING") (\w' -> getDebug w') w
return (line
++"\n"++ "Distribtion - internal data\n"
++"\n"++line
++"\n"++ "SiteId:"
++"\n"++ show s
++"\n"++ "Type:"
++"\n"++ show t
++"\n"++line
++"\n"++ "Master:"
++"\n"++tmp
++"\n"++line
++"\n"++"Worker:"
++mtmp
++"\n"++line++"\n")
instance MapReduce DMapReduce where
closeMapReduce (DMapReduce mr)
= withMVar mr $
\(DMapReduceData _ _ m w) ->
do
case w of
(Just w') -> W.closeWorker w'
(Nothing) -> return ()
M.closeMaster m
getMySiteId (DMapReduce mr)
= withMVar mr $ \(DMapReduceData s _ _ _) -> return s
getMapReduceType (DMapReduce mr)
= withMVar mr $ \(DMapReduceData _ t _ _) -> return t
startControlling (DMapReduce mr)
= withMVar mr $ \(DMapReduceData _ _ m _) -> startControlling m
stopControlling (DMapReduce mr)
= withMVar mr $ \(DMapReduceData _ _ m _) -> stopControlling m
isControlling (DMapReduce mr)
= withMVar mr $ \(DMapReduceData _ _ m _) -> isControlling m
doSingleStep (DMapReduce mr)
= withMVar mr $ \(DMapReduceData _ _ m _) -> doSingleStep m
doMapReduceJob ji (DMapReduce mr)
= withMVar mr $ \(DMapReduceData _ _ m _) -> doMapReduceJob ji m