-- ----------------------------------------------------------------------------
{- |
  Module     : Holumbus.Distribution.DMapReduce
  Copyright  : Copyright (C) 2008 Stefan Schmidt
  License    : MIT

  Maintainer : Stefan Schmidt (stefanschmidt@web.de)
  Stability  : experimental
  Portability: portable
  Version    : 0.1


-}
-- ----------------------------------------------------------------------------

{-# OPTIONS -fglasgow-exts #-}
module Holumbus.Distribution.DMapReduce
(
-- * Datatypes
  DMapReduce
, MapReduce(..)

-- * Configuration
, DMRMasterConf(..)
, defaultMRMasterConfig
, DMRWorkerConf(..)
, defaultMRWorkerConfig
, DMRClientConf(..)
, defaultMRClientConfig


-- * Creation and Destruction
, mkMapReduceMaster
, mkMapReduceWorker
, mkMapReduceClient
)
where

import           Control.Concurrent
import           Network
import           System.Log.Logger

import           Holumbus.Common.Debug
import           Holumbus.MapReduce.Types
import           Holumbus.MapReduce.MapReduce
import qualified Holumbus.Distribution.Master as M
import qualified Holumbus.Distribution.Master.MasterData as MD
import qualified Holumbus.Distribution.Master.MasterPort as MP
import qualified Holumbus.Distribution.Worker as W
import qualified Holumbus.Distribution.Worker.WorkerData as WD
import qualified Holumbus.Distribution.Worker.WorkerPort as WP
import qualified Holumbus.FileSystem.FileSystem as FS
import           Holumbus.Network.Site
import           Holumbus.Network.Port


localLogger :: String
localLogger = "Holumbus.Distribution.DMapReduce"

-- ----------------------------------------------------------------------------
-- Datatypes
-- ----------------------------------------------------------------------------


data DMapReduceData = 
   forall m w. (M.MasterClass m, W.WorkerClass w, Debug m, Debug w) =>
   DMapReduceData SiteId MapReduceType m (Maybe w)


data DMapReduce = DMapReduce (MVar DMapReduceData)

instance Show DMapReduce where
  show _ = "DMapReduce"



-- ---------------------------------------------------------------------------
-- Configurations
-- ---------------------------------------------------------------------------


data DMRMasterConf = DMRMasterConf {
    msc_StartControlling :: Bool
  , msc_StreamName       :: StreamName
  , msc_PortNumber       :: Maybe PortNumber
  } 

defaultMRMasterConfig :: DMRMasterConf
defaultMRMasterConfig = DMRMasterConf True "MRMaster" Nothing


data DMRWorkerConf = DMRWorkerConf {
    woc_StreamName       :: StreamName
  , woc_SocketId         :: Maybe SocketId
  } 

defaultMRWorkerConfig :: DMRWorkerConf
defaultMRWorkerConfig = DMRWorkerConf "MRMaster" Nothing


data DMRClientConf = DMRClientConf {
    clc_StreamName       :: StreamName
  , clc_SocketId         :: Maybe SocketId
  } 

defaultMRClientConfig :: DMRClientConf
defaultMRClientConfig = DMRClientConf "MRMaster" Nothing


-- ---------------------------------------------------------------------------
-- Creation and Destruction
-- ---------------------------------------------------------------------------


mkMapReduceMaster 
  :: FS.FileSystem -> DMRMasterConf
  -> IO DMapReduce
mkMapReduceMaster fs conf
  = do
    sid <- getSiteId
    infoM localLogger $ "initialising master on site " ++ show sid  
    md <- MD.newMaster fs (msc_StartControlling conf) (msc_StreamName conf) (msc_PortNumber conf)
    newDMapReduce MRTMaster md (Nothing::Maybe WP.WorkerPort)


mkMapReduceWorker
 :: FS.FileSystem -> ActionMap -> DMRWorkerConf
 -> IO DMapReduce
mkMapReduceWorker fs am conf
  = do
    sid <- getSiteId
    infoM localLogger $ "initialising worker on site " ++ show sid  
    mp <- MP.newMasterPort (woc_StreamName conf) (woc_SocketId conf)
    wd <- WD.newWorker fs am (woc_StreamName conf) (woc_SocketId conf)
    newDMapReduce MRTWorker mp (Just wd)


mkMapReduceClient 
  :: DMRClientConf
  -> IO DMapReduce
mkMapReduceClient conf
  = do
    sid <- getSiteId
    infoM localLogger $ "initialising map-reduce-client on site " ++ show sid  
    mp <- MP.newMasterPort (clc_StreamName conf) (clc_SocketId conf)
    newDMapReduce MRTClient mp (Nothing::Maybe WP.WorkerPort)
    

newDMapReduce
  :: (M.MasterClass m, W.WorkerClass w, Debug m, Debug w)
  => MapReduceType -> m -> Maybe w -> IO DMapReduce
newDMapReduce t m w
  = do
    sid <- getSiteId
    d <- newMVar (DMapReduceData sid t m w)
    return $ DMapReduce d
    
{-
getMasterRequestPort :: DMapReduce -> IO MSG.MasterRequestPort
getMasterRequestPort (DMapReduce mr)
  = withMVar mr $ \(DMapReduceData _ _ m _) -> return $ M.getMasterRequestPort m
-}    



-- ---------------------------------------------------------------------------
-- public functions
-- ---------------------------------------------------------------------------

instance Debug DMapReduce where

  printDebug (DMapReduce mr)
    = withMVar mr $
        \(DMapReduceData s t m w) ->
        do
        putStrLn "--------------------------------------------------------"
        putStrLn "Distribtion - internal data\n"
        putStrLn "--------------------------------------------------------"
        putStrLn "SiteId:"
        putStrLn $ show s
        putStrLn "Type:"
        putStrLn $ show t
        putStrLn "--------------------------------------------------------"
        putStrLn "Master:"
        printDebug m
        putStrLn "--------------------------------------------------------"
        putStrLn "Worker:"
        maybe (putStrLn "NOTHING") (\w' -> printDebug w') w
        putStrLn "--------------------------------------------------------"

  getDebug (DMapReduce mr)
    = withMVar mr $
        \(DMapReduceData s t m w) ->
        do
        let line = "--------------------------------------------------------"
        tmp <- getDebug m
        mtmp <- maybe (return "NOTHING") (\w' -> getDebug w') w
        return (line
          ++"\n"++ "Distribtion - internal data\n"
          ++"\n"++line
          ++"\n"++ "SiteId:"
          ++"\n"++ show s
          ++"\n"++ "Type:"
          ++"\n"++ show t
          ++"\n"++line
          ++"\n"++ "Master:"
          ++"\n"++tmp
          ++"\n"++line
          ++"\n"++"Worker:"
          ++mtmp
          ++"\n"++line++"\n")



instance MapReduce DMapReduce where


  closeMapReduce (DMapReduce mr)
    = withMVar mr $
        \(DMapReduceData _ _ m w) ->
        do
        case w of
          (Just w') -> W.closeWorker w'
          (Nothing) -> return ()
        M.closeMaster m


  getMySiteId (DMapReduce mr)
    = withMVar mr $ \(DMapReduceData s _ _ _) -> return s

  
  getMapReduceType (DMapReduce mr)
    = withMVar mr $ \(DMapReduceData _ t _ _) -> return t

  
  startControlling (DMapReduce mr)
    = withMVar mr $ \(DMapReduceData _ _ m _) -> startControlling m

  
  stopControlling (DMapReduce mr)
    = withMVar mr $ \(DMapReduceData _ _ m _) -> stopControlling m

  
  isControlling (DMapReduce mr)
    = withMVar mr $ \(DMapReduceData _ _ m _) -> isControlling m

  
  doSingleStep (DMapReduce mr)
    = withMVar mr $ \(DMapReduceData _ _ m _) -> doSingleStep m


  doMapReduceJob ji (DMapReduce mr)
    = withMVar mr $ \(DMapReduceData _ _ m _) -> doMapReduceJob ji m