module Data.Hadoop.Configuration
( getHadoopConfig
, getHadoopUser
, getNameNodes
) where
import Control.Applicative ((<$>), (<*>))
import Control.Exception (IOException, handle)
import qualified Data.ByteString.Char8 as B
import qualified Data.HashMap.Lazy as H
import Data.Maybe (fromMaybe, mapMaybe)
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Read as T
import System.Environment (lookupEnv)
import System.Posix.User (getEffectiveUserName)
import Text.XmlHtml
import Data.Hadoop.Types
getHadoopConfig :: IO HadoopConfig
getHadoopConfig = do
hcUser <- getHadoopUser
hcNameNodes <- getNameNodes
let hcProxy = Nothing
return HadoopConfig{..}
getHadoopUser :: IO User
getHadoopUser = maybe fromUnix return =<< fromEnv
where
fromEnv :: IO (Maybe User)
fromEnv = fmap T.pack <$> lookupEnv "HADOOP_USER_NAME"
fromUnix :: IO User
fromUnix = T.pack <$> getEffectiveUserName
type HadoopXml = H.HashMap Text Text
getNameNodes :: IO [NameNode]
getNameNodes = do
cfg <- H.union <$> readHadoopConfig "/etc/hadoop/conf/core-site.xml"
<*> readHadoopConfig "/etc/hadoop/conf/hdfs-site.xml"
return $ fromMaybe []
$ resolveNameNode cfg <$> (stripProto =<< H.lookup fsDefaultNameKey cfg)
where
proto = "hdfs://"
fsDefaultNameKey = "fs.defaultFS"
nameNodesPrefix = "dfs.ha.namenodes."
rpcAddressPrefix = "dfs.namenode.rpc-address."
stripProto :: Text -> Maybe Text
stripProto uri | proto `T.isPrefixOf` uri = Just (T.drop (T.length proto) uri)
| otherwise = Nothing
resolveNameNode :: HadoopXml -> Text -> [NameNode]
resolveNameNode cfg name = case parseEndpoint name of
Just ep -> [ep]
Nothing -> mapMaybe (\nn -> lookupAddress cfg $ name <> "." <> nn)
(lookupNameNodes cfg name)
lookupNameNodes :: HadoopXml -> Text -> [Text]
lookupNameNodes cfg name = fromMaybe []
$ T.splitOn "," <$> H.lookup (nameNodesPrefix <> name) cfg
lookupAddress :: HadoopXml -> Text -> Maybe Endpoint
lookupAddress cfg name = parseEndpoint =<< H.lookup (rpcAddressPrefix <> name) cfg
parseEndpoint :: Text -> Maybe Endpoint
parseEndpoint ep = Endpoint host <$> port
where
host = T.takeWhile (/= ':') ep
port = either (const Nothing) (Just . fst)
$ T.decimal $ T.drop (T.length host + 1) ep
readHadoopConfig :: FilePath -> IO HadoopXml
readHadoopConfig path = do
exml <- readXML path
case exml of
Left _ -> return H.empty
Right xml -> return (toHashMap (docContent xml))
where
toHashMap = H.fromList . mapMaybe fromNode
. concatMap (descendantElementsTag "property")
fromNode n = (,) <$> (nodeText <$> childElementTag "name" n)
<*> (nodeText <$> childElementTag "value" n)
readXML :: FilePath -> IO (Either String Document)
readXML path = handle onError (parseXML path <$> B.readFile path)
where
onError :: IOException -> IO (Either String Document)
onError e = return $ Left $ show e