module Holumbus.Distribution.SimpleDMapReduceIO
(
MapFunction
, ReduceFunction
, client
, worker
, partition'
, Priority(..)
, putTimeStamp
)
where
import Holumbus.Network.PortRegistry.PortRegistryPort
import Holumbus.MapReduce.Types
import Holumbus.Common.FileHandling
import Data.Binary
import Data.Maybe
import Control.Parallel.Strategies
import qualified Holumbus.Distribution.DMapReduce as MR
import qualified Holumbus.FileSystem.FileSystem as FS
import qualified Holumbus.MapReduce.DaemonInterface as DI
import qualified Holumbus.Data.KeyMap as KMap
import Holumbus.Common.Logging
import Holumbus.Common.Utils ( handleAll )
import System.Log.Logger
import System.Environment
import System.Exit
import Data.Time.Clock.POSIX
import qualified Data.ByteString.Lazy as B
mapConfiguration
:: (Hash k2, NFData v1, NFData k1, NFData v2, NFData k2, Ord k2, Binary a, Binary k1, Binary v1, Binary k2, Binary v2)
=> MapFunction a k1 v1 k2 v2
-> MapConfiguration a k1 v1 k2 v2
mapConfiguration fct
= MapConfiguration
fct
hashedPartition
defaultInputReader
defaultOutputWriter
reduceConfiguration
:: (Hash k2, NFData v2, NFData k2, NFData v3, Ord k2, Binary a, Binary k2, Binary v2, Binary v3)
=> ReduceFunction a k2 v2 v3
-> ReduceConfiguration a k2 v2 v3
reduceConfiguration fct
= ReduceConfiguration
defaultMerge
fct
hashedPartition
defaultInputReader
defaultOutputWriter
actionConfig :: (Hash k1, Hash k2, Binary a, NFData k1, NFData k2, Ord k2, Binary k1, Binary k2, NFData v1, NFData v4, NFData v2, NFData v3, Binary v1, Binary v3, Binary v2, Binary v4) => MapFunction a k1 v1 k2 v2 -> ReduceFunction a k2 v3 v4 -> ActionConfiguration a k1 v1 k2 v2 v3 v4
actionConfig m r = (defaultActionConfiguration "ID") {
ac_Split = Nothing
, ac_Map = Just . mapConfiguration $ m
, ac_Reduce = Just . reduceConfiguration $ r
}
client :: ( Show k1, Show k2, Show v1, Show v2, Show v3, Show v4
, Binary v1, Binary v3, Binary v2, Binary v4
, Binary a, Binary k1, Binary k2
, NFData k1, NFData k2, NFData v1, NFData v4, NFData v2, NFData v3
, Ord k2, Hash k1, Hash k2) =>
MapFunction a k1 v1 k2 v2 -> ReduceFunction a k2 v3 v4
-> a -> (Int,Int,Int) -> [[(k1,v1)]] -> IO [(k2,v4)]
client m r a (splitters,mappers,reducers) lss = do
p <- newPortRegistryFromXmlFile "/tmp/registry.xml"
setPortRegistry p
fs <- FS.mkFileSystemClient FS.defaultFSClientConfig
siteid <- FS.getMySiteId fs
putStrLn $ "My FS Siteid is: " ++ show siteid
mr <- initializeData
let (files,filenames) = prepareFiles lss 0
FS.createFiles files fs
putTimeStamp "SimpleDMR Begin MR"
(_,fids) <- MR.doMapReduce (actionConfig m r) a [] filenames splitters mappers reducers 1 TOTFile mr
putTimeStamp "SimpleDMR End MR"
result <- merge fids fs
deinitializeData mr
FS.closeFileSystem fs
return result
prepareFiles :: Binary a => [[a]] -> Int -> ([(String,B.ByteString)],[String])
prepareFiles [] _ = ([],[])
prepareFiles (x:xs) i = ((fn,bin):files,fn:filenames)
where
fn = ("Initial_input_"++show i)
bin = listToByteString x
(files,filenames) = prepareFiles xs (i+1)
merge :: (Show k2, Show v4, Hash k2, Binary k2, Binary v4, NFData k2, NFData v4) => [FS.FileId] -> FS.FileSystem -> IO [(k2,v4)]
merge fids fs = do
mayberesult <- mapM ( flip FS.getFileContent fs) fids
let result = concat . map parseByteStringToList $ catMaybes mayberesult
return result
initializeData :: IO (MR.DMapReduce)
initializeData
= do
let config = MR.defaultMRClientConfig
MR.mkMapReduceClient config
deinitializeData :: MR.DMapReduce -> IO ()
deinitializeData mr
= do
MR.closeMapReduce mr
version :: String
version = "SimpleWorker v 0.1"
prompt :: String
prompt = "# "++version++" > "
localLogger :: String
localLogger = "Holumbus.Distribution.SimpleDMapReduceIO.worker"
pUsage :: IO ()
pUsage = do
putStrLn "Usage: SimpleWorker ConsolePort Logfile"
params :: IO [String]
params = do
args <- getArgs
if length args /= 2 then do
errorM localLogger "Wrong argument count"
pUsage
exitFailure
else
return args
worker :: (Show k1, Show k2, Show v1, Show v2, Hash k1, Hash k2, Binary a, NFData k1, NFData k2, Ord k2, Binary k1, Binary k2, NFData v1,NFData v4, NFData v2, NFData v3, Binary v1, Binary v3, Binary v2, Binary v4, Show v4, Show v3) => MapFunction a k1 v1 k2 v2 -> ReduceFunction a k2 v3 v4 -> [(String,Priority)] -> IO ()
worker m r loggers = do
handleAll (\e -> errorM localLogger $ "EXCEPTION: " ++ show e) $
do
(s_cport:logfile:[]) <- params
initializeFileLogging logfile ([(localLogger, INFO),("Holumbus.Network.DoWithServer",INFO)]++loggers)
p <- newPortRegistryFromXmlFile "/tmp/registry.xml"
setPortRegistry p
(mr,fs) <- initWorker m r
DI.runDaemon mr version (read s_cport) prompt
deinitWorker (mr,fs)
initWorker :: (Hash k1, Show k1, Show k2, Show v1, Show v2, Show v3, Show v4, Hash k2, Binary a, NFData k1, NFData k2, Ord k2, Binary k1, Binary k2, NFData v1, NFData v4, NFData v2, NFData v3, Binary v1, Binary v3, Binary v2, Binary v4) => MapFunction a k1 v1 k2 v2 -> ReduceFunction a k2 v3 v4 -> IO (MR.DMapReduce, FS.FileSystem)
initWorker m r
= do
fs <- FS.mkFileSystemNode FS.defaultFSNodeConfig
mr <- MR.mkMapReduceWorker fs actionMap MR.defaultMRWorkerConfig
return (mr,fs)
where
actionMap :: ActionMap
actionMap = KMap.insert (readActionConfiguration (actionConfig m r)) KMap.empty
deinitWorker :: (MR.DMapReduce, FS.FileSystem) -> IO ()
deinitWorker (mr,fs)
= do
MR.closeMapReduce mr
FS.closeFileSystem fs
partition' :: [a] -> [[a]] -> [[a]]
partition' _ [] = []
partition' [] xss = xss
partition' us (_xs:[]) = [us]
partition' (u:us) (xs:xss) = partition' us (xss ++ [xs'])
where xs' = (u:xs)
putTimeStamp :: String -> IO ()
putTimeStamp s = do
t1 <- getPOSIXTime
putStrLn (s++" : "++ show t1)