module Control.Distributed.Task.DataAccess.HdfsDataSource (supplyPathWithConfig, loadEntries, copyToLocal) where

import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Codec.Compression.BZip as BZip
import qualified Codec.Compression.GZip as GZip
import qualified Data.Hadoop.Configuration as HDFS
import qualified Data.Hadoop.Types as HDFS
import Data.List.Split (splitOn)
import qualified Data.Text as T
import Network.Hadoop.Hdfs
import Network.Hadoop.Read
import System.Directory (doesFileExist)
import System.IO (IOMode(..), withBinaryFile)

import Control.Distributed.Task.Types.TaskTypes (TaskInput)
import Control.Distributed.Task.Types.HdfsConfigTypes
import Control.Distributed.Task.Util.Configuration
import Control.Distributed.Task.Util.ErrorHandling
import Control.Distributed.Task.Util.Logging

supplyPathWithConfig :: HdfsPath -> IO HdfsLocation
supplyPathWithConfig p = do
  config <- getConfiguration
  return (_hdfsConfig config, p)

loadEntries :: HdfsLocation -> IO TaskInput
loadEntries hdfsLocation = do
  logInfo $ "loading: " ++ targetDescription
  withErrorPrefix ("Error accessing "++ targetDescription) doLoad >>= return . BLC.lines . unzipIfNecessary
  where
    targetDescription = show hdfsLocation
    doLoad = readHdfsFile hdfsLocation
    unzipIfNecessary =
      let parts = splitOn "." (snd hdfsLocation)
          suffix = if null parts then "" else last parts
      in case suffix of
          "gz" -> GZip.decompress
          "bz2" -> BZip.decompress
          _ -> id

readHdfsFile :: HdfsLocation -> IO BL.ByteString
readHdfsFile (hdfsConfig, path) = do
  logInfo $ "now loading" ++ show (hdfsConfig, path)
  config <- buildConfig hdfsConfig
  chunks <- readAllChunks readerAction config path
  logInfo "loading chunks finished"
  return $ BLC.fromChunks $ reverse chunks
  where
    readerAction :: [B.ByteString] -> BC.ByteString -> IO [B.ByteString]
    readerAction collected nextChunk = return $ nextChunk:collected

buildConfig :: HdfsConfig -> IO HDFS.HadoopConfig
buildConfig (host, port) = do
  user <- HDFS.getHadoopUser
  return $ HDFS.HadoopConfig user [(HDFS.Endpoint (T.pack host) port)] Nothing

readAllChunks :: ([B.ByteString] -> BC.ByteString -> IO [B.ByteString]) -> HDFS.HadoopConfig -> String -> IO [B.ByteString]
readAllChunks action config path =  do
  readHandle_ <- runHdfs' config $ openRead $ BC.pack path
  case readHandle_ of
   (Just readHandle) -> hdfsFoldM action [] readHandle
   Nothing -> error "no read handle"

copyToLocal :: HdfsLocation -> FilePath -> IO ()
copyToLocal (hdfsConfig, path) destFile = do
  logInfo $ "now copying from " ++ show hdfsConfig
  config <- buildConfig hdfsConfig
  copyHdfsFileToLocal destFile (Just config) path

copyHdfsFileToLocal :: FilePath -> Maybe HDFS.HadoopConfig -> String -> IO ()
copyHdfsFileToLocal destFile config path = do
  destFileExists <- doesFileExist destFile
  if destFileExists
    then error $ destFile++" exists"
    else withBinaryFile destFile WriteMode $ \h -> withHdfsReader (BC.hPut h) config path

withHdfsReader :: (BC.ByteString -> IO ()) -> Maybe HDFS.HadoopConfig -> String -> IO ()
withHdfsReader action config path =  do
  readHandle_ <- maybe runHdfs runHdfs' config $ openRead $ BC.pack path
  case readHandle_ of
   (Just readHandle) -> hdfsMapM_ action readHandle
   Nothing -> error "no read handle"