module Control.Distributed.Task.Distribution.RunComputation (
MasterOptions(..),
TaskSpec(..),
DataSpec(..),
ResultSpec(..),
runMaster) where
import Data.List (isPrefixOf, sort)
import Data.List.Split (splitOn)
import System.Directory (getDirectoryContents)
import System.Environment (getExecutablePath)
import qualified Control.Distributed.Task.DataAccess.HdfsListing as HDFS
import Control.Distributed.Task.Distribution.TaskDistribution
import Control.Distributed.Task.TaskSpawning.TaskSpawning (fullBinarySerializationOnMaster, serializedThunkSerializationOnMaster, objectCodeSerializationOnMaster)
import Control.Distributed.Task.TaskSpawning.TaskDefinition
import Control.Distributed.Task.Types.HdfsConfigTypes
import Control.Distributed.Task.Types.TaskTypes
import Control.Distributed.Task.Util.Configuration
import Control.Distributed.Task.Util.Logging
data MasterOptions = MasterOptions {
_host :: String,
_port :: Int,
_taskSpec :: TaskSpec,
_dataSpecs :: DataSpec,
_resultSpec :: ResultSpec
}
data TaskSpec
= SourceCodeSpec String
| FullBinaryDeployment
| SerializedThunk (TaskInput -> TaskResult)
| ObjectCodeModuleDeployment (TaskInput -> TaskResult)
data DataSpec
= SimpleDataSpec Int
| HdfsDataSpec HdfsPath Int (Maybe String)
data ResultSpec
= CollectOnMaster ([TaskResult] -> IO ())
| StoreInHdfs String String
| Discard
runMaster :: MasterOptions -> IO ()
runMaster (MasterOptions masterHost masterPort taskSpec dataSpec resultSpec) = do
taskDef <- buildTaskDef taskSpec
dataDefs <- expandDataSpec dataSpec
(resultDef, resultProcessor) <- return $ buildResultDef resultSpec
executeDistributed (masterHost, masterPort) taskDef dataDefs resultDef resultProcessor
where
buildResultDef :: ResultSpec -> (ResultDef, [TaskResult] -> IO ())
buildResultDef (CollectOnMaster resultProcessor) = (ReturnAsMessage, resultProcessor)
buildResultDef (StoreInHdfs outputPrefix outputSuffix) = (HdfsResult outputPrefix outputSuffix True, \_ -> putStrLn "result stored in hdfs")
buildResultDef Discard = (ReturnOnlyNumResults, \num -> putStrLn $ (show num) ++ " results discarded")
buildTaskDef :: TaskSpec -> IO TaskDef
buildTaskDef (SourceCodeSpec modulePath) = do
moduleContent <- readFile modulePath
return $ mkSourceCodeModule modulePath moduleContent
buildTaskDef FullBinaryDeployment =
getExecutablePath >>= fullBinarySerializationOnMaster
buildTaskDef (SerializedThunk function) = do
selfPath <- getExecutablePath
serializedThunkSerializationOnMaster selfPath function
buildTaskDef (ObjectCodeModuleDeployment _) = objectCodeSerializationOnMaster
expandDataSpec :: DataSpec -> IO [DataDef]
expandDataSpec (HdfsDataSpec path depth filterPrefix) = do
putStrLn $ "looking for files at " ++ path
paths <- hdfsListFilesInSubdirsFiltering depth filterPrefix path
putStrLn $ "found these input files: " ++ (show paths)
return $ map HdfsData paths
expandDataSpec (SimpleDataSpec numTasks) = do
config <- getConfiguration
files <- getDirectoryContents (_pseudoDBPath config)
return $ take numTasks $ map PseudoDB $ filter (not . ("." `isPrefixOf`)) $ sort files
mkSourceCodeModule :: String -> String -> TaskDef
mkSourceCodeModule modulePath moduleContent = SourceCodeModule (strippedModuleName modulePath) moduleContent
where
strippedModuleName = reverse . takeWhile (/= '/') . drop 1 . dropWhile (/= '.') . reverse
hdfsListFilesInSubdirsFiltering :: Int -> Maybe String -> String -> IO [String]
hdfsListFilesInSubdirsFiltering descendDepth fileNamePrefixFilter path = do
initialFilePaths <- HDFS.listFiles path
recursiveFiles <- recursiveDescent descendDepth path initialFilePaths
logDebug $ "found: " ++ (show recursiveFiles)
return $ map trimSlashes $ maybe recursiveFiles (\prefix -> filter ((prefix `isPrefixOf`) . getFileNamePart) recursiveFiles) fileNamePrefixFilter
where
getFileNamePart path' = let parts = splitOn "/" path' in if null parts then "" else parts !! (length parts 1)
recursiveDescent :: Int -> String -> [String] -> IO [String]
recursiveDescent 0 prefix paths = return (map (\p -> prefix++"/"++p) paths)
recursiveDescent n prefix paths = do
absolute <- return $ map (prefix++) paths :: IO [String]
pathsWithChildren <- mapM (\p -> (HDFS.listFiles p >>= \cs -> return (p, cs))) absolute :: IO [(String, [String])]
descended <- mapM (\(p, cs) -> if null cs then return [p] else recursiveDescent (n1) p cs) pathsWithChildren :: IO [[String]]
return $ concat descended
trimSlashes :: String -> String
trimSlashes [] = []
trimSlashes ('/':'/':rest) = trimSlashes $ '/':rest
trimSlashes (x:rest) = x:(trimSlashes rest)