module Control.Distributed.Task.TaskSpawning.DeployFullBinary ( ExternalExecutionResult, deployAndRunFullBinary, deployAndRunExternalBinary, fullBinaryExecution, runExternalBinary, packIOHandling, unpackIOHandling, IOHandling(..) ) where import qualified Codec.Compression.GZip as GZip import Control.Concurrent.Async (async, wait) import qualified Data.ByteString.Lazy.Char8 as BLC import qualified Data.ByteString.Lazy as BL import Data.List (intersperse) import System.Process (readProcessWithExitCode) import Control.Distributed.Task.DataAccess.DataSource import Control.Distributed.Task.TaskSpawning.ExecutionUtil import Control.Distributed.Task.TaskSpawning.TaskDefinition import Control.Distributed.Task.TaskSpawning.TaskDescription import Control.Distributed.Task.TaskSpawning.TaskSpawningTypes import Control.Distributed.Task.Types.TaskTypes import Control.Distributed.Task.Util.FileUtil import Control.Distributed.Task.Util.Logging type ExternalExecutionResult = ([TaskResult], NominalDiffTime) deployAndRunFullBinary :: String -> IOHandling -> BL.ByteString -> IO ExternalExecutionResult deployAndRunFullBinary mainArg = deployAndRunExternalBinary [mainArg] deployAndRunExternalBinary :: [String] -> IOHandling -> BL.ByteString -> IO ExternalExecutionResult deployAndRunExternalBinary programBaseArgs ioHandling program = withTempBLFile "distributed-program" program $ runExternalBinary programBaseArgs ioHandling {-| Makes an external system call, parameters must match those read in RemoteExecutionSupport. -} runExternalBinary :: [String] -> IOHandling -> FilePath -> IO ExternalExecutionResult runExternalBinary programBaseArgs ioHandling executablePath = measureDuration $ do readProcessWithExitCode "chmod" ["+x", executablePath] "" >>= expectSilentSuccess logInfo $ "worker: spawning external process: "++executablePath++" with baseArgs: "++show programBaseArgs processOutput <- executeExternal executablePath (programBaseArgs ++ [packIOHandling ioHandling]) logInfo $ "worker: external process finished: "++executablePath return $ consumeResults processOutput {-| Some methods parse the contents of stdout, thus these will fail in the case of logging to it (only ERROR at the moment). -} fullBinaryExecution :: IOHandling -> Task -> IO () fullBinaryExecution (IOHandling dataDefs resultDef) task = do logInfo $ "external binary: processing "++(concat $ intersperse ", " $ map describe dataDefs) tasks <- mapM (async . runSingle task resultDef) dataDefs -- reminder: trying to collect results here and submit them collectively makes it harder to force evaluation into the parallel part mapM_ wait tasks logInfo $ "external binary: tasks completed" runSingle :: Task -> ResultDef -> DataDef -> IO () runSingle task resultDef dataDef = do taskInput <- loadData dataDef let result = task taskInput in do processedResult <- processResult result emitResult processedResult where processResult :: TaskResult -> IO TaskResult processResult taskResult = case resultDef of ReturnAsMessage -> return taskResult ReturnOnlyNumResults -> return [BLC.pack $ show $ length taskResult] (HdfsResult pre suf z) -> writeResultsToHdfs dataDef pre suf z taskResult emitResult :: TaskResult -> IO () emitResult = BLC.putStrLn . BLC.concat . intersperse (BLC.pack "|") consumeResults :: String -> [TaskResult] consumeResults = map (BLC.split '|') . BLC.lines . BLC.pack writeResultsToHdfs :: DataDef -> String -> String -> Bool -> TaskResult -> IO TaskResult writeResultsToHdfs dataDef pre suf z taskResult = writeToHdfs >> return [] where hdfsPath (HdfsData p) = pre++"/"++p++"/"++suf hdfsPath _ = error "implemented only for hdfs output" writeToHdfs = withTempBLCFile "move-to-hdfs" fileContent $ \tempFileName -> let (dirPart, filePart) = splitBasePath $ hdfsPath dataDef in copyToHdfs tempFileName dirPart filePart where fileContent = (if z then GZip.compress else id) $ BLC.concat $ intersperse (BLC.pack "\n") taskResult copyToHdfs localFile destPath destFilename = do logInfo $ "external binary: copying "++localFile++" to "++destPath++"/"++destFilename _ <- executeExternal "hdfs" ["dfs", "-mkdir", "-p", destPath] -- hadoop-rpc does not yet support writing, the external system call is an acceptable workaround executeExternal "hdfs" ["dfs", "-copyFromLocal", localFile, destPath++"/"++destFilename]