module Control.Distributed.Task.Distribution.DataLocality ( findNodesWithData, -- visible for testing: nodeMatcher ) where import Control.Distributed.Process (NodeId) import Data.List (sortBy) import Data.List.Split (splitOn) import Data.Ord (comparing) import Prelude hiding (log) import qualified Control.Distributed.Task.DataAccess.HdfsListing as HDFS import Control.Distributed.Task.Util.Logging {- Filters the given nodes to those with any of the file blocks, ordered by the number of file blocks (not regarding individual file block length). -} findNodesWithData :: String -> [NodeId] -> IO [NodeId] findNodesWithData hdfsFilePath nodes = do logInfo ("All nodes for : " ++ hdfsFilePath ++": "++ (show nodes)) hostsWithData <- HDFS.listBlockDistribution hdfsFilePath (if null hostsWithData then logError else logInfo) ("Hdfs hosts with data: " ++ (show hostsWithData)) hosts <- readHostNames logDebug (show hosts) mergedNodeIds <- return $ map fst $ reverse $ sortOn snd $ merge (matcher hosts) merger nodes hostsWithData logInfo ("Merged nodes: " ++ (show mergedNodeIds)) return mergedNodeIds where matcher hosts node (hdfsName, _) = nodeMatcher hosts (show node) hdfsName -- HACK uses show to access nodeId data merger :: NodeId -> HdfsHit -> (NodeId, Int) merger nid (_, n) = (nid, n) type HdfsHit = (String, BlockCount) type BlockCount = Int readHostNames :: IO [(String, String)] readHostNames = do allHosts <- readFile "/etc/hosts" >>= return . parseHostFile extraHosts <- readFile "etc/hostconfig" >>= return . parseHostFile return $ allHosts ++ extraHosts where parseHostFile :: String -> [(String, String)] parseHostFile = concat . map parseHosts . filter comments . lines where comments [] = False comments ('#':_) = False comments _ = True parseHosts :: String -> [(String, String)] parseHosts = parseHosts' . splitOn " " . collapseWhites . map replaceTabs where replaceTabs :: Char -> Char replaceTabs '\t' = ' ' replaceTabs c = c collapseWhites :: String -> String collapseWhites (' ':' ':rest) = ' ':(collapseWhites rest) collapseWhites (c:rest) = c:(collapseWhites rest) collapseWhites r = r parseHosts' :: [String] -> [(String, String)] parseHosts' es = if length es < 2 then [] else map (\v -> (head es,v)) (tail es) nodeMatcher ::[(String, String)] -> String -> String -> Bool nodeMatcher hosts node hdfsName = (extractHdfsHost hdfsName) == (extractNodeIdHost node) where -- HACK: extracts the host name from "nid://localhost:44441:0" extractNodeIdHost = lookupHostname . dropWhile (=='/') . head . drop 1 . splitOn ":" -- HACK: extracts the host name from "127.0.0.1:50010" extractHdfsHost = lookupHostname . head . splitOn ":" lookupHostname :: String -> String lookupHostname k = maybe k id (lookup k hosts) {- Merges left with right: for each left, take the first match in right, ignoring other possible matches. -} merge :: (a -> b -> Bool) -> (a -> b -> c) -> [a] -> [b] -> [c] merge matcher merger = merge' where merge' _ [] = [] merge' [] _ = [] merge' (a:as) bs = maybe restMerge (:restMerge) (merge'' bs) where restMerge = merge' as bs merge'' [] = Nothing merge'' (b:bs') = if matcher a b then Just (merger a b) else merge'' bs' {- since 4.8 in Data.List -} sortOn :: Ord b => (a -> b) -> [a] -> [a] sortOn f = map snd . sortBy (comparing fst) . map (\x -> let y = f x in y `seq` (y, x))