module Control.Distributed.Task.TaskSpawning.DeployFullBinary (
ExternalExecutionResult,
deployAndRunFullBinary, deployAndRunExternalBinary, fullBinaryExecution, runExternalBinary,
packIOHandling, unpackIOHandling, IOHandling(..)
) where
import qualified Codec.Compression.GZip as GZip
import Control.Concurrent.Async (async, wait)
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.ByteString.Lazy as BL
import Data.List (intersperse)
import System.Process (readProcessWithExitCode)
import Control.Distributed.Task.DataAccess.DataSource
import Control.Distributed.Task.TaskSpawning.ExecutionUtil
import Control.Distributed.Task.TaskSpawning.TaskDefinition
import Control.Distributed.Task.TaskSpawning.TaskDescription
import Control.Distributed.Task.TaskSpawning.TaskSpawningTypes
import Control.Distributed.Task.Types.TaskTypes
import Control.Distributed.Task.Util.FileUtil
import Control.Distributed.Task.Util.Logging
type ExternalExecutionResult = ([TaskResult], NominalDiffTime)
deployAndRunFullBinary :: String -> IOHandling -> BL.ByteString -> IO ExternalExecutionResult
deployAndRunFullBinary mainArg = deployAndRunExternalBinary [mainArg]
deployAndRunExternalBinary :: [String] -> IOHandling -> BL.ByteString -> IO ExternalExecutionResult
deployAndRunExternalBinary programBaseArgs ioHandling program =
withTempBLFile "distributed-program" program $ runExternalBinary programBaseArgs ioHandling
runExternalBinary :: [String] -> IOHandling -> FilePath -> IO ExternalExecutionResult
runExternalBinary programBaseArgs ioHandling executablePath =
measureDuration $ do
readProcessWithExitCode "chmod" ["+x", executablePath] "" >>= expectSilentSuccess
logInfo $ "worker: spawning external process: "++executablePath++" with baseArgs: "++show programBaseArgs
processOutput <- executeExternal executablePath (programBaseArgs ++ [packIOHandling ioHandling])
logInfo $ "worker: external process finished: "++executablePath
return $ consumeResults processOutput
fullBinaryExecution :: IOHandling -> Task -> IO ()
fullBinaryExecution (IOHandling dataDefs resultDef) task = do
logInfo $ "external binary: processing "++(concat $ intersperse ", " $ map describe dataDefs)
tasks <- mapM (async . runSingle task resultDef) dataDefs
mapM_ wait tasks
logInfo $ "external binary: tasks completed"
runSingle :: Task -> ResultDef -> DataDef -> IO ()
runSingle task resultDef dataDef = do
taskInput <- loadData dataDef
let result = task taskInput in do
processedResult <- processResult result
emitResult processedResult
where
processResult :: TaskResult -> IO TaskResult
processResult taskResult =
case resultDef of
ReturnAsMessage -> return taskResult
ReturnOnlyNumResults -> return [BLC.pack $ show $ length taskResult]
(HdfsResult pre suf z) -> writeResultsToHdfs dataDef pre suf z taskResult
emitResult :: TaskResult -> IO ()
emitResult = BLC.putStrLn . BLC.concat . intersperse (BLC.pack "|")
consumeResults :: String -> [TaskResult]
consumeResults = map (BLC.split '|') . BLC.lines . BLC.pack
writeResultsToHdfs :: DataDef -> String -> String -> Bool -> TaskResult -> IO TaskResult
writeResultsToHdfs dataDef pre suf z taskResult = writeToHdfs >> return []
where
hdfsPath (HdfsData p) = pre++"/"++p++"/"++suf
hdfsPath _ = error "implemented only for hdfs output"
writeToHdfs =
withTempBLCFile "move-to-hdfs" fileContent $ \tempFileName ->
let (dirPart, filePart) = splitBasePath $ hdfsPath dataDef
in copyToHdfs tempFileName dirPart filePart
where
fileContent = (if z then GZip.compress else id) $ BLC.concat $ intersperse (BLC.pack "\n") taskResult
copyToHdfs localFile destPath destFilename = do
logInfo $ "external binary: copying "++localFile++" to "++destPath++"/"++destFilename
_ <- executeExternal "hdfs" ["dfs", "-mkdir", "-p", destPath]
executeExternal "hdfs" ["dfs", "-copyFromLocal", localFile, destPath++"/"++destFilename]