{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} module Data.Hadoop.Configuration ( getHadoopConfig , getHadoopUser , getNameNodes ) where import Control.Applicative ((<$>), (<*>)) import Control.Exception (IOException, handle) import qualified Data.ByteString.Char8 as B import qualified Data.HashMap.Lazy as H import Data.Maybe (fromMaybe, mapMaybe) import Data.Monoid ((<>)) import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Read as T import System.Environment (lookupEnv) import System.Posix.User (getEffectiveUserName) import Text.XmlHtml import Data.Hadoop.Types ------------------------------------------------------------------------ getHadoopConfig :: IO HadoopConfig getHadoopConfig = do hcUser <- getHadoopUser hcNameNodes <- getNameNodes let hcProxy = Nothing return HadoopConfig{..} ------------------------------------------------------------------------ getHadoopUser :: IO User getHadoopUser = maybe fromUnix return =<< fromEnv where fromEnv :: IO (Maybe User) fromEnv = fmap T.pack <$> lookupEnv "HADOOP_USER_NAME" fromUnix :: IO User fromUnix = T.pack <$> getEffectiveUserName ------------------------------------------------------------------------ type HadoopXml = H.HashMap Text Text getNameNodes :: IO [NameNode] getNameNodes = do cfg <- H.union <$> readHadoopConfig "/etc/hadoop/conf/core-site.xml" <*> readHadoopConfig "/etc/hadoop/conf/hdfs-site.xml" return $ fromMaybe [] $ resolveNameNode cfg <$> (stripProto =<< H.lookup fsDefaultNameKey cfg) where proto = "hdfs://" fsDefaultNameKey = "fs.defaultFS" nameNodesPrefix = "dfs.ha.namenodes." rpcAddressPrefix = "dfs.namenode.rpc-address." stripProto :: Text -> Maybe Text stripProto uri | proto `T.isPrefixOf` uri = Just (T.drop (T.length proto) uri) | otherwise = Nothing resolveNameNode :: HadoopXml -> Text -> [NameNode] resolveNameNode cfg name = case parseEndpoint name of Just ep -> [ep] -- contains "host:port" directly Nothing -> mapMaybe (\nn -> lookupAddress cfg $ name <> "." <> nn) (lookupNameNodes cfg name) lookupNameNodes :: HadoopXml -> Text -> [Text] lookupNameNodes cfg name = fromMaybe [] $ T.splitOn "," <$> H.lookup (nameNodesPrefix <> name) cfg lookupAddress :: HadoopXml -> Text -> Maybe Endpoint lookupAddress cfg name = parseEndpoint =<< H.lookup (rpcAddressPrefix <> name) cfg parseEndpoint :: Text -> Maybe Endpoint parseEndpoint ep = Endpoint host <$> port where host = T.takeWhile (/= ':') ep port = either (const Nothing) (Just . fst) $ T.decimal $ T.drop (T.length host + 1) ep readHadoopConfig :: FilePath -> IO HadoopXml readHadoopConfig path = do exml <- readXML path case exml of Left _ -> return H.empty Right xml -> return (toHashMap (docContent xml)) where toHashMap = H.fromList . mapMaybe fromNode . concatMap (descendantElementsTag "property") fromNode n = (,) <$> (nodeText <$> childElementTag "name" n) <*> (nodeText <$> childElementTag "value" n) readXML :: FilePath -> IO (Either String Document) readXML path = handle onError (parseXML path <$> B.readFile path) where onError :: IOException -> IO (Either String Document) onError e = return $ Left $ show e