module Control.Distributed.Task.DataAccess.HdfsListing (
  listFiles, listBlockDistribution
  ) where

import Control.Exception (catch, SomeException)
import qualified Data.ByteString.Char8 as BC
import qualified Data.Hadoop.Configuration as HRPC
import qualified Data.Hadoop.Types as HRPC
import qualified Network.Hadoop.Hdfs as HRPC
import qualified Data.Text as T
import qualified Data.Vector as V

import Control.Distributed.Task.Util.Configuration
import Control.Distributed.Task.Util.Logging

listFiles :: String -> IO [String]
listFiles path = do
  logInfo $ "listing files at: "++path
  files <- runHdfsWithConfig $ HRPC.getListing' $ BC.pack path
  return $ V.toList $ V.map convert $ files
  where
    convert :: HRPC.FileStatus -> String
    convert = BC.unpack . HRPC.fsPath

listBlockDistribution :: String -> IO [(String, Int)]
listBlockDistribution path = do
  logInfo $ "listing file blocks for: "++path
  locs <- runHdfsWithConfig $ HRPC.getBlockLocations $ BC.pack path
  case V.length locs of
   0 -> error $ "no such file: "++path
   1 -> return $ convert $ HRPC.fsLocations $ locs V.! 0
   _ -> error $ "is not a regular file: "++path
  where
    convert :: HRPC.FsLocations -> [(String, Int)]
    convert HRPC.NotRequested = error "block locations not requested?"
    convert (HRPC.FsLocations ls) =
      let blockLocations = V.toList $ V.map V.toList $ V.map snd ls :: [[HRPC.BlockLocation]]
          hostnames = map (T.unpack . fst) $ concat blockLocations :: [String]
      in groupCount hostnames

groupCount :: (Eq key) => [key] -> [(key, Int)]
groupCount = foldr groupCount' []
  where
    groupCount' :: (Eq key) => key -> [(key, Int)] -> [(key, Int)]
    groupCount' key [] = [(key, 1)]
    groupCount' key (next:collected) = if (fst next == key) then (fst next, snd next +1):collected else next:(groupCount' key collected)

runHdfsWithConfig :: HRPC.Hdfs a -> IO a
runHdfsWithConfig action = do
  hdfsConfig <- mkHdfsConfig `catch` warnAndUseDefault
  (maybe HRPC.runHdfs HRPC.runHdfs' hdfsConfig) action
  where
    warnAndUseDefault :: SomeException -> IO (Maybe HRPC.HadoopConfig)
    warnAndUseDefault e = logWarn ("failed to load config, trying system defaults: "++show e) >> return Nothing
    mkHdfsConfig :: IO (Maybe HRPC.HadoopConfig)
    mkHdfsConfig = do
      user <- HRPC.getHadoopUser
      (host, port) <- getConfiguration >>= return . _hdfsConfig
      return $ Just $ HRPC.HadoopConfig user [HRPC.Endpoint (T.pack host) port] Nothing