module Control.Distributed.Task.Distribution.DataLocality (
findNodesWithData,
nodeMatcher
) where
import Control.Distributed.Process (NodeId)
import Data.List (sortBy)
import Data.List.Split (splitOn)
import Data.Ord (comparing)
import Prelude hiding (log)
import qualified Control.Distributed.Task.DataAccess.HdfsListing as HDFS
import Control.Distributed.Task.Util.Logging
findNodesWithData :: String -> [NodeId] -> IO [NodeId]
findNodesWithData hdfsFilePath nodes = do
logInfo ("All nodes for : " ++ hdfsFilePath ++": "++ (show nodes))
hostsWithData <- HDFS.listBlockDistribution hdfsFilePath
(if null hostsWithData then logError else logInfo) ("Hdfs hosts with data: " ++ (show hostsWithData))
hosts <- readHostNames
logDebug (show hosts)
mergedNodeIds <- return $ map fst $ reverse $ sortOn snd $ merge (matcher hosts) merger nodes hostsWithData
logInfo ("Merged nodes: " ++ (show mergedNodeIds))
return mergedNodeIds
where
matcher hosts node (hdfsName, _) = nodeMatcher hosts (show node) hdfsName
merger :: NodeId -> HdfsHit -> (NodeId, Int)
merger nid (_, n) = (nid, n)
type HdfsHit = (String, BlockCount)
type BlockCount = Int
readHostNames :: IO [(String, String)]
readHostNames = do
allHosts <- readFile "/etc/hosts" >>= return . parseHostFile
extraHosts <- readFile "etc/hostconfig" >>= return . parseHostFile
return $ allHosts ++ extraHosts
where
parseHostFile :: String -> [(String, String)]
parseHostFile = concat . map parseHosts . filter comments . lines
where
comments [] = False
comments ('#':_) = False
comments _ = True
parseHosts :: String -> [(String, String)]
parseHosts = parseHosts' . splitOn " " . collapseWhites . map replaceTabs
where
replaceTabs :: Char -> Char
replaceTabs '\t' = ' '
replaceTabs c = c
collapseWhites :: String -> String
collapseWhites (' ':' ':rest) = ' ':(collapseWhites rest)
collapseWhites (c:rest) = c:(collapseWhites rest)
collapseWhites r = r
parseHosts' :: [String] -> [(String, String)]
parseHosts' es = if length es < 2 then [] else map (\v -> (head es,v)) (tail es)
nodeMatcher ::[(String, String)] -> String -> String -> Bool
nodeMatcher hosts node hdfsName = (extractHdfsHost hdfsName) == (extractNodeIdHost node)
where
extractNodeIdHost = lookupHostname . dropWhile (=='/') . head . drop 1 . splitOn ":"
extractHdfsHost = lookupHostname . head . splitOn ":"
lookupHostname :: String -> String
lookupHostname k = maybe k id (lookup k hosts)
merge :: (a -> b -> Bool) -> (a -> b -> c) -> [a] -> [b] -> [c]
merge matcher merger = merge'
where
merge' _ [] = []
merge' [] _ = []
merge' (a:as) bs = maybe restMerge (:restMerge) (merge'' bs)
where
restMerge = merge' as bs
merge'' [] = Nothing
merge'' (b:bs') = if matcher a b then Just (merger a b) else merge'' bs'
sortOn :: Ord b => (a -> b) -> [a] -> [a]
sortOn f =
map snd . sortBy (comparing fst) . map (\x -> let y = f x in y `seq` (y, x))