{-|
  Defines a higher level interface to running calculations. Resolves HDFS input paths.
|-}
module Control.Distributed.Task.Distribution.RunComputation (
  MasterOptions(..),
  TaskSpec(..),
  DataSpec(..),
  ResultSpec(..),
  runMaster) where

import Data.List (isPrefixOf, sort)
import Data.List.Split (splitOn)
import System.Directory (getDirectoryContents) -- being renamed to listDirectory
import System.Environment (getExecutablePath)

import qualified Control.Distributed.Task.DataAccess.HdfsListing as HDFS
import Control.Distributed.Task.Distribution.TaskDistribution
import Control.Distributed.Task.TaskSpawning.TaskSpawning (fullBinarySerializationOnMaster, serializedThunkSerializationOnMaster, objectCodeSerializationOnMaster)
import Control.Distributed.Task.TaskSpawning.TaskDefinition
import Control.Distributed.Task.Types.HdfsConfigTypes
import Control.Distributed.Task.Types.TaskTypes
import Control.Distributed.Task.Util.Configuration
import Control.Distributed.Task.Util.Logging

-- | The definition of a distributed calculation.
data MasterOptions = MasterOptions {
  -- | the master hostname
  _host :: String,
  -- | the master port
  _port :: Int,
  -- | the task logic
  _taskSpec :: TaskSpec,
  -- | which data to process
  _dataSpecs :: DataSpec,
  -- | how to process the result
  _resultSpec :: ResultSpec
  }

-- | Task logic definition, most modes expect task mode support, see RemoteExecutionSupport.
data TaskSpec
   -- | build the given string as module remotely (restrictions apply)
 = SourceCodeSpec String
   -- | run this binary as task
 | FullBinaryDeployment
   -- | serialize the given function in the context of the given program, run both as task (restrictions apply)
 | SerializedThunk (TaskInput -> TaskResult)
   -- | only transport some of the generated object code and relink remotely (restrictions apply) - the function here is ignored, it only forces the compilation of the contained module
 | ObjectCodeModuleDeployment (TaskInput -> TaskResult)
-- | definition of input data   
data DataSpec
    -- | simple test data, the path is configured, amount of files can be limited
  = SimpleDataSpec Int
    -- | use given HDFS as starting directory, descend a number of directories from there and take all files starting with the filter prefix (if any given)
  | HdfsDataSpec HdfsPath Int (Maybe String)
-- | what to do with the result
data ResultSpec
    -- | process all results with the given method
  = CollectOnMaster ([TaskResult] -> IO ())
    -- | store the results in HDFS, in the given directory(1), with the given suffix (2), based on the input path.
  | StoreInHdfs String String
    -- | do nothing, for testing purposes only
  | Discard

-- | Run a computation.
runMaster :: MasterOptions -> IO ()
runMaster (MasterOptions masterHost masterPort taskSpec dataSpec resultSpec) = do
  taskDef <- buildTaskDef taskSpec
  dataDefs <- expandDataSpec dataSpec
  (resultDef, resultProcessor) <- return $ buildResultDef resultSpec
  executeDistributed (masterHost, masterPort) taskDef dataDefs resultDef resultProcessor
    where
      buildResultDef :: ResultSpec -> (ResultDef, [TaskResult] -> IO ())
      buildResultDef (CollectOnMaster resultProcessor) = (ReturnAsMessage, resultProcessor)
      buildResultDef (StoreInHdfs outputPrefix outputSuffix) = (HdfsResult outputPrefix outputSuffix True, \_ -> putStrLn "result stored in hdfs")
      buildResultDef Discard = (ReturnOnlyNumResults, \num -> putStrLn $ (show num) ++ " results discarded")

buildTaskDef :: TaskSpec -> IO TaskDef
buildTaskDef (SourceCodeSpec modulePath) = do
  moduleContent <- readFile modulePath
  return $ mkSourceCodeModule modulePath moduleContent
buildTaskDef FullBinaryDeployment =
  getExecutablePath >>= fullBinarySerializationOnMaster
buildTaskDef (SerializedThunk function) = do
  selfPath <- getExecutablePath
  serializedThunkSerializationOnMaster selfPath function
buildTaskDef (ObjectCodeModuleDeployment _) = objectCodeSerializationOnMaster

expandDataSpec :: DataSpec -> IO [DataDef]
expandDataSpec (HdfsDataSpec path depth filterPrefix) = do
  putStrLn $ "looking for files at " ++ path
  paths <- hdfsListFilesInSubdirsFiltering depth filterPrefix path
  putStrLn $ "found these input files: " ++ (show paths)
  return $ map HdfsData paths
expandDataSpec (SimpleDataSpec numTasks) = do
  config <- getConfiguration
  files <- getDirectoryContents (_pseudoDBPath config)
  return $ take numTasks $ map PseudoDB $ filter (not . ("." `isPrefixOf`)) $ sort files

mkSourceCodeModule :: String -> String -> TaskDef
mkSourceCodeModule modulePath moduleContent = SourceCodeModule (strippedModuleName modulePath) moduleContent
  where
    strippedModuleName = reverse . takeWhile (/= '/') . drop 1 . dropWhile (/= '.') . reverse

{-|
 Like hdfsListFiles, but descending into subdirectories and filtering the file names. Note that for now this is a rather a quick hack
 for special needs than a full fledged shell expansion
|-}
hdfsListFilesInSubdirsFiltering :: Int -> Maybe String -> String -> IO [String]
hdfsListFilesInSubdirsFiltering descendDepth fileNamePrefixFilter path = do
  initialFilePaths <- HDFS.listFiles path
  recursiveFiles <- recursiveDescent descendDepth path initialFilePaths
  logDebug $ "found: " ++ (show recursiveFiles)
  return $ map trimSlashes $ maybe recursiveFiles (\prefix -> filter ((prefix `isPrefixOf`) . getFileNamePart) recursiveFiles) fileNamePrefixFilter
  where
    getFileNamePart path' = let parts = splitOn "/" path' in if null parts then "" else parts !! (length parts -1)
    recursiveDescent :: Int -> String -> [String] -> IO [String]
    recursiveDescent 0 prefix paths = return (map (\p -> prefix++"/"++p) paths)
    recursiveDescent n prefix paths = do
      absolute <- return $ map (prefix++) paths :: IO [String]
      pathsWithChildren <- mapM (\p -> (HDFS.listFiles p >>= \cs -> return (p, cs))) absolute :: IO [(String, [String])]
      descended <- mapM (\(p, cs) -> if null cs then return [p] else recursiveDescent (n-1) p cs) pathsWithChildren :: IO [[String]]
      return $ concat descended
    trimSlashes :: String -> String
    trimSlashes [] = [] -- hadoop-rpc does not work on /paths//with/double//slashes
    trimSlashes ('/':'/':rest) = trimSlashes $ '/':rest
    trimSlashes (x:rest) = x:(trimSlashes rest)