module Control.Distributed.Task.Distribution.RunComputation (
  MasterOptions(..),
  TaskSpec(..),
  DataSpec(..),
  ResultSpec(..),
  runMaster) where
import Data.List (isPrefixOf, sort)
import Data.List.Split (splitOn)
import System.Directory (getDirectoryContents) 
import System.Environment (getExecutablePath)
import qualified Control.Distributed.Task.DataAccess.HdfsListing as HDFS
import Control.Distributed.Task.Distribution.TaskDistribution
import Control.Distributed.Task.TaskSpawning.TaskSpawning (fullBinarySerializationOnMaster, serializedThunkSerializationOnMaster, objectCodeSerializationOnMaster)
import Control.Distributed.Task.TaskSpawning.TaskDefinition
import Control.Distributed.Task.Types.HdfsConfigTypes
import Control.Distributed.Task.Types.TaskTypes
import Control.Distributed.Task.Util.Configuration
import Control.Distributed.Task.Util.Logging
data MasterOptions = MasterOptions {
  
  _host :: String,
  
  _port :: Int,
  
  _taskSpec :: TaskSpec,
  
  _dataSpecs :: DataSpec,
  
  _resultSpec :: ResultSpec
  }
data TaskSpec
   
 = SourceCodeSpec String
   
 | FullBinaryDeployment
   
 | SerializedThunk (TaskInput -> TaskResult)
   
 | ObjectCodeModuleDeployment (TaskInput -> TaskResult)
data DataSpec
    
  = SimpleDataSpec Int
    
  | HdfsDataSpec HdfsPath Int (Maybe String)
data ResultSpec
    
  = CollectOnMaster ([TaskResult] -> IO ())
    
  | StoreInHdfs String String
    
  | Discard
runMaster :: MasterOptions -> IO ()
runMaster (MasterOptions masterHost masterPort taskSpec dataSpec resultSpec) = do
  taskDef <- buildTaskDef taskSpec
  dataDefs <- expandDataSpec dataSpec
  (resultDef, resultProcessor) <- return $ buildResultDef resultSpec
  executeDistributed (masterHost, masterPort) taskDef dataDefs resultDef resultProcessor
    where
      buildResultDef :: ResultSpec -> (ResultDef, [TaskResult] -> IO ())
      buildResultDef (CollectOnMaster resultProcessor) = (ReturnAsMessage, resultProcessor)
      buildResultDef (StoreInHdfs outputPrefix outputSuffix) = (HdfsResult outputPrefix outputSuffix True, \_ -> putStrLn "result stored in hdfs")
      buildResultDef Discard = (ReturnOnlyNumResults, \num -> putStrLn $ (show num) ++ " results discarded")
buildTaskDef :: TaskSpec -> IO TaskDef
buildTaskDef (SourceCodeSpec modulePath) = do
  moduleContent <- readFile modulePath
  return $ mkSourceCodeModule modulePath moduleContent
buildTaskDef FullBinaryDeployment =
  getExecutablePath >>= fullBinarySerializationOnMaster
buildTaskDef (SerializedThunk function) = do
  selfPath <- getExecutablePath
  serializedThunkSerializationOnMaster selfPath function
buildTaskDef (ObjectCodeModuleDeployment _) = objectCodeSerializationOnMaster
expandDataSpec :: DataSpec -> IO [DataDef]
expandDataSpec (HdfsDataSpec path depth filterPrefix) = do
  putStrLn $ "looking for files at " ++ path
  paths <- hdfsListFilesInSubdirsFiltering depth filterPrefix path
  putStrLn $ "found these input files: " ++ (show paths)
  return $ map HdfsData paths
expandDataSpec (SimpleDataSpec numTasks) = do
  config <- getConfiguration
  files <- getDirectoryContents (_pseudoDBPath config)
  return $ take numTasks $ map PseudoDB $ filter (not . ("." `isPrefixOf`)) $ sort files
mkSourceCodeModule :: String -> String -> TaskDef
mkSourceCodeModule modulePath moduleContent = SourceCodeModule (strippedModuleName modulePath) moduleContent
  where
    strippedModuleName = reverse . takeWhile (/= '/') . drop 1 . dropWhile (/= '.') . reverse
hdfsListFilesInSubdirsFiltering :: Int -> Maybe String -> String -> IO [String]
hdfsListFilesInSubdirsFiltering descendDepth fileNamePrefixFilter path = do
  initialFilePaths <- HDFS.listFiles path
  recursiveFiles <- recursiveDescent descendDepth path initialFilePaths
  logDebug $ "found: " ++ (show recursiveFiles)
  return $ map trimSlashes $ maybe recursiveFiles (\prefix -> filter ((prefix `isPrefixOf`) . getFileNamePart) recursiveFiles) fileNamePrefixFilter
  where
    getFileNamePart path' = let parts = splitOn "/" path' in if null parts then "" else parts !! (length parts 1)
    recursiveDescent :: Int -> String -> [String] -> IO [String]
    recursiveDescent 0 prefix paths = return (map (\p -> prefix++"/"++p) paths)
    recursiveDescent n prefix paths = do
      absolute <- return $ map (prefix++) paths :: IO [String]
      pathsWithChildren <- mapM (\p -> (HDFS.listFiles p >>= \cs -> return (p, cs))) absolute :: IO [(String, [String])]
      descended <- mapM (\(p, cs) -> if null cs then return [p] else recursiveDescent (n1) p cs) pathsWithChildren :: IO [[String]]
      return $ concat descended
    trimSlashes :: String -> String
    trimSlashes [] = [] 
    trimSlashes ('/':'/':rest) = trimSlashes $ '/':rest
    trimSlashes (x:rest) = x:(trimSlashes rest)