-- ---------------------------------------------------------------------------- {- | Module : Holumbus.MapReduce.Types Copyright : Copyright (C) 2008 Stefan Schmidt License : MIT Maintainer : Stefan Schmidt (stefanschmidt@web.de) Stability : experimental Portability: portable Version : 0.1 -} -- ---------------------------------------------------------------------------- {-# OPTIONS -fglasgow-exts #-} {-# LANGUAGE Arrows, NoMonomorphismRestriction #-} module Holumbus.MapReduce.Types ( Hash(..) , hashedPartition , FunctionData(..) {- , encodeTuple , decodeTuple , decodeTupleList , encodeTupleList -} -- * TaskData , TaskId , TaskType(..) , TaskState(..) , getNextTaskState , TaskOutputType(..) , TaskData(..) -- * JobData , JobId , JobState(..) , getNextJobState , getPrevJobState , fromJobStatetoTaskType , OutputMap , JobAction(..) , JobInfo(..) , JobData(..) , JobResultContainer(..) , JobResult(..) -- * TaskAction , ActionName , ActionInfo , ActionEnvironment(..) , mkActionEnvironment , InputReader , OutputWriter , OptionsDecoder -- ---------------------------------------------------------------------------- -- remove this, when work done! -- defaultFunctions (only for profiling) , defaultInputReader , defaultOutputWriter , defaultSplit , defaultPartition , defaultMerge , readConnector , writeConnector -- ---------------------------------------------------------------------------- , ActionConfiguration(..) , SplitConfiguration(..) , MapConfiguration(..) , ReduceConfiguration(..) , defaultActionConfiguration , defaultSplitConfiguration , defaultMapConfiguration , defaultReduceConfiguration , readActionConfiguration , createJobInfoFromConfiguration , createListsFromJobResult , ActionData(..) , getActionForTaskType , ActionMap -- * MapAction , MapAction -- , BinaryMapAction , MapFunction , MapPartition -- * Combine-Reduce-Action , ReduceAction -- , BinaryReduceAction , ReduceMerge , ReduceFunction , ReducePartition , SplitFunction , SplitAction ) where import Control.Concurrent import Data.Binary --import Holumbus.Common.MRBinary import qualified Data.ByteString.Lazy as B import qualified Data.Map as Map import qualified Data.HashTable as Hash import Data.Maybe import Data.Time import System.Log.Logger import Data.Time.Clock.POSIX import Text.XML.HXT.Arrow import qualified Holumbus.Data.AccuMap as AMap import Holumbus.Common.FileHandling import Holumbus.Common.Utils import qualified Holumbus.Data.KeyMap as KMap import qualified Holumbus.FileSystem.FileSystem as FS import Control.DeepSeq localLogger :: String localLogger = "Holumbus.MapReduce.Types" -- ---------------------------------------------------------------------------- -- general datatypes -- ---------------------------------------------------------------------------- class Hash a where hash :: Int -> a -> Int instance Hash Int where hash n k = mod k n data FunctionData = TupleFunctionData B.ByteString | FileFunctionData FS.FileId deriving (Show, Eq, Ord) instance Binary FunctionData where put (TupleFunctionData t) = putWord8 0 >> put t put (FileFunctionData f) = putWord8 1 >> put f get = do t <- getWord8 case t of 1 -> get >>= \f -> return (FileFunctionData f) _ -> get >>= \b -> return (TupleFunctionData b) instance XmlPickler FunctionData where xpickle = xpFunctionData xpFunctionData :: PU FunctionData xpFunctionData = xpElem "data" $ xpAlt tag ps where tag (TupleFunctionData _) = 0 tag (FileFunctionData _) = 1 ps = [xpWrap (\p -> TupleFunctionData (encodeTuple p), \(TupleFunctionData p) -> decodeTuple p) xpRawFunctionData ,xpWrap (\f -> FileFunctionData f, \(FileFunctionData f) -> f) xpFileFunctionData ] xpRawFunctionData = xpPair (xpElem "key" xpText0) (xpElem "value" xpText0) xpFileFunctionData = xpElem "filename" xpText -- ---------------------------------------------------------------------------- -- general encoding / decoding -- ---------------------------------------------------------------------------- encodeTuple :: (Binary k, Binary v) => (k, v) -> B.ByteString encodeTuple t = encode t decodeTuple :: (Binary k, Binary v) => B.ByteString -> (k, v) decodeTuple t = decode t -- encodeTupleList :: (Binary k, Binary v) => [(k, v)] -> [B.ByteString] -- encodeTupleList ls = map encodeTuple ls -- decodeTupleList :: (Binary k, Binary v) => [B.ByteString] -> [(k, v)] -- decodeTupleList ls = map decodeTuple ls -- ---------------------------------------------------------------------------- -- Task DataTypes -- ---------------------------------------------------------------------------- -- | the task id (should be unique in the system) type TaskId = Integer -- | which type (map, combine, reduce) data TaskType = TTSplit | TTMap | TTCombine | TTReduce | TTError deriving (Show, Eq, Ord) instance Binary TaskType where put (TTSplit) = putWord8 1 put (TTMap) = putWord8 2 put (TTCombine) = putWord8 3 put (TTReduce) = putWord8 4 put (TTError) = putWord8 0 get = do t <- getWord8 case t of 1 -> return (TTSplit) 2 -> return (TTMap) 3 -> return (TTCombine) 4 -> return (TTReduce) _ -> return (TTError) -- | the task state data TaskState = TSIdle | TSSending | TSInProgress | TSCompleted | TSFinished | TSError deriving (Show, Eq, Ord, Enum) instance Binary TaskState where put (TSIdle) = putWord8 1 put (TSSending) = putWord8 2 put (TSInProgress) = putWord8 3 put (TSCompleted) = putWord8 4 put (TSFinished) = putWord8 5 put (TSError) = putWord8 0 get = do t <- getWord8 case t of 1 -> return (TSIdle) 2 -> return (TSSending) 3 -> return (TSInProgress) 4 -> return (TSCompleted) 5 -> return (TSFinished) _ -> return (TSError) getNextTaskState :: TaskState -> TaskState getNextTaskState TSError = TSError getNextTaskState TSFinished = TSFinished getNextTaskState s = succ s data TaskOutputType = TOTRawTuple | TOTFile deriving (Show, Read, Eq, Ord, Enum) instance Binary TaskOutputType where put (TOTRawTuple) = putWord8 0 put (TOTFile) = putWord8 1 get = do t <- getWord8 case t of 0 -> return (TOTRawTuple) _ -> return (TOTFile) instance XmlPickler TaskOutputType where xpickle = xpAttr "output" $ xpPrim -- | the TaskData, contains all information to do the task data TaskData = TaskData { td_JobId :: ! JobId , td_TaskId :: ! TaskId , td_Type :: ! TaskType , td_State :: ! TaskState , td_Option :: ! B.ByteString , td_PartValue :: ! (Maybe Int) , td_Input :: ! (Int, [FunctionData]) , td_Output :: ! [(Int,[FunctionData])] , td_OutputType :: ! TaskOutputType , td_Action :: ! ActionName } deriving (Show, Eq, Ord) instance Binary TaskData where put (TaskData jid tid tt ts opt pv i o ot a) = put jid >> put tid >> put tt >> put ts >> put opt >> put pv >> put i >> put o >> put ot >> put a get = do jid <- get tid <- get tt <- get ts <- get opt <- get pv <- get i <- get o <- get ot <- get a <- get return (TaskData jid tid tt ts opt pv i o ot a) -- ---------------------------------------------------------------------------- -- Job Datatypes -- ---------------------------------------------------------------------------- -- | the job id (should be unique in the system) type JobId = Integer -- | the job state data JobState = JSPlanned | JSIdle | JSSplit | JSMap | JSCombine | JSReduce | JSCompleted | JSFinished | JSError deriving(Show, Eq, Ord, Enum) instance Binary JobState where put (JSPlanned) = putWord8 1 put (JSIdle) = putWord8 2 put (JSSplit) = putWord8 3 put (JSMap) = putWord8 4 put (JSCombine) = putWord8 5 put (JSReduce) = putWord8 6 put (JSCompleted) = putWord8 7 put (JSFinished) = putWord8 8 put (JSError) = putWord8 0 get = do t <- getWord8 case t of 1 -> return (JSPlanned) 2 -> return (JSIdle) 3 -> return (JSSplit) 4 -> return (JSMap) 5 -> return (JSCombine) 6 -> return (JSReduce) 7 -> return (JSCompleted) 8 -> return (JSFinished) _ -> return (JSError) getNextJobState :: JobState -> JobState getNextJobState JSError = JSError getNextJobState JSFinished = JSFinished getNextJobState s = succ s getPrevJobState :: JobState -> JobState getPrevJobState JSIdle = JSIdle getPrevJobState JSError = JSError getPrevJobState s = pred s fromJobStatetoTaskType :: JobState -> Maybe TaskType fromJobStatetoTaskType JSSplit = Just TTSplit fromJobStatetoTaskType JSMap = Just TTMap fromJobStatetoTaskType JSCombine = Just TTCombine fromJobStatetoTaskType JSReduce = Just TTReduce fromJobStatetoTaskType _ = Nothing type OutputMap = Map.Map JobState (AMap.AccuMap Int FunctionData) data JobAction = JobAction { ja_Name :: ! ActionName , ja_Output :: ! TaskOutputType , ja_Count :: ! Int } deriving (Show, Eq) instance Binary JobAction where put (JobAction n o c) = put n >> put o >> put c get = do n <- get o <- get c <- get return (JobAction n o c) instance XmlPickler JobAction where xpickle = xpJobAction xpJobAction :: PU JobAction xpJobAction = xpWrap (\(n, o, c) -> JobAction n o c, \(JobAction n o c) -> (n, o, c)) xpAction where xpAction = xpTriple (xpFunction) (xpickle) (xpCount) xpFunction = xpAttr "name" xpText xpCount = xpWrap ((maybe 1 id), Just) $ xpOption $ xpAttr "count" xpickle -- | defines a job, this is all data the user has to give to run a job data JobInfo = JobInfo { ji_Description :: ! String , ji_Option :: ! B.ByteString , ji_SplitAction :: ! (Maybe JobAction) , ji_MapAction :: ! (Maybe JobAction) , ji_CombineAction :: ! (Maybe JobAction) , ji_ReduceAction :: ! (Maybe JobAction) , ji_NumOfResults :: ! (Maybe Int) , ji_Input :: ! [FunctionData] } deriving (Show, Eq) instance Binary JobInfo where put (JobInfo d opt sa ma ca ra nor i) = put d >> put opt >> put sa >> put ma >> put ca >> put ra >> put nor >> put i get = do d <- get opt <- get sa <- get ma <- get ca <- get ra <- get nor <- get i <- get return (JobInfo d opt sa ma ca ra nor i) instance XmlPickler JobInfo where xpickle = xpJobInfo xpJobInfo :: PU JobInfo xpJobInfo = xpElem "jobInfo" $ xpWrap (\(n, (sa,ma,ca,ra), nor, i) -> JobInfo n (encode ()) sa ma ca ra nor i, \(JobInfo n _ sa ma ca ra nor i) -> (n, (sa,ma,ca,ra), nor, i)) xpJob where xpJob = xp4Tuple (xpAttr "name" xpText0) (xpActions) (xpCount) (xpFunctionDataList) xpActions = xp4Tuple (xpSplitAction) (xpMapAction) (xpCombineAction) (xpReduceAction) xpSplitAction = xpOption $ xpElem "split" $ xpJobAction xpMapAction = xpOption $ xpElem "map" $ xpJobAction xpCombineAction = xpOption $ xpElem "combine" $ xpJobAction xpReduceAction = xpOption $ xpElem "reduce" $ xpJobAction xpFunctionDataList = xpWrap (filterEmptyList,setEmptyList) $ xpOption $ xpElem "inputList" $ xpList xpFunctionData xpCount = xpOption $ xpAttr "numOfResults" xpickle -- | the job data, include the user-input and some additional control-data data JobData = JobData { jd_JobId :: JobId , jd_State :: JobState , jd_OutputMap :: OutputMap , jd_Info :: JobInfo , jd_startTime :: UTCTime , jd_endTime :: UTCTime , jd_Result :: JobResultContainer } instance Show JobData where show (JobData jid state om _ t1 t2 _) = "JobId:\t" ++ show jid ++ "\n" ++ "State:\t" ++ show state ++ "\n" ++ "Info:\tJobInfo\n" ++ "StartTime:\t" ++ show t1 ++ "\n" ++ "EndTime:\t" ++ show t2 ++ "\n" ++ "OutputMap:\n" ++ show om ++ "\n" data JobResultContainer = JobResultContainer (MVar JobResult) instance Show JobResultContainer where show _ = "JobResultContainer" -- | the result of the job, given by the master data JobResult = JobResult { jr_Output :: [FunctionData] } deriving (Show) instance Binary JobResult where put (JobResult o) = put o get = do o <- get return (JobResult o) -- ---------------------------------------------------------------------------- -- Reader and Writer -- ---------------------------------------------------------------------------- -- | the ActionEnvironment contains all data that might be needed -- during an action. So far, it only keeps the current task data and -- a reference to the global filesystem and the options. -- This is a good place to implement counters for the map-reduce-system -- or other stuff... data ActionEnvironment = ActionEnvironment { ae_TaskData :: TaskData , ae_FileSystem :: FS.FileSystem } mkActionEnvironment :: TaskData -> FS.FileSystem -> ActionEnvironment mkActionEnvironment td fs = ActionEnvironment td fs type InputReader k1 v1 = B.ByteString -> IO [(k1,v1)] type OutputWriter k2 v2 = [(k2,v2)] -> IO B.ByteString readConnector :: (NFData k1, NFData v1, Binary k1, Binary v1) => InputReader k1 v1 -> ActionEnvironment -> [FunctionData] -> IO [(k1,v1)] readConnector ic ae ls = do let (tuples,filenames) = splitList ls mayBeTupleContent <- mapM readInput tuples fileContent <- read2 filenames return . concat $ (catMaybes mayBeTupleContent) ++ fileContent where -- split list into tuople data and filenames splitList :: [FunctionData] -> ([B.ByteString],[FS.FileId]) splitList [] = ([],[]) splitList ((TupleFunctionData t):xs) = ((t:ts),fs) where (ts,fs) = splitList xs splitList ((FileFunctionData f):xs) = (ts,f:fs) where (ts,fs) = splitList xs -- read all files at once --read2 :: [FS.FileId] -> IO [[(k1,v1)]] read2 files = do contents <- FS.getMultiFileContent files fs encodedcontents <- mapM (ic . snd) contents return encodedcontents where fs = ae_FileSystem ae -- readInput :: FS.FileSystem -> FunctionData -> IO (Maybe [(k1,v1)]) readInput t = return $ Just $ [decode t] {- readInput fs (FileFunctionData f) = do debugM localLogger $ "loadInputList: getting content for: " ++ f mbc <- FS.getFileContent f fs if isNothing mbc then do debugM localLogger $ "loadInputList: no content found" return Nothing else do d <- ic $ fromJust mbc return $ Just d -} -- --myMapM :: (a -> IO b) -> [a] -> IO [b] --myMapM f [] = return [] --myMapM f (x:xs) = do -- infoM localLogger $ "myMapM: do f" -- b <- f x -- infoM localLogger $ "myMapM: do recursiv map" -- bs <- myMapM f xs -- infoM localLogger $ "myMapM: do return" -- return (b:bs) writeConnector :: (Binary k2, Binary v2) => OutputWriter k2 v2 -> ActionEnvironment -> [(Int,[(k2,v2)])] -> IO [(Int,[FunctionData])] writeConnector oc ae ls = do infoM localLogger $ "writeConnector: " ++ (show . length $ ls) os <- case tot of TOTRawTuple -> mapM (writeOutput (ae_FileSystem ae) tot) ls _ -> write2 return $ catMaybes os where write2 = do bincontents <- mapM (\(_,c) -> oc c) ls let filelist = zip filenames bincontents; FS.createFiles filelist fs return $ (zipWith (\(i,_) fn -> Just (i,[FileFunctionData fn]) ) ls filenames) where filenames = map (\(i,_) -> "j" ++ show (td_JobId td) ++ "_t" ++ show (td_TaskId td) ++ "_i" ++ show i) ls fs = ae_FileSystem ae td = ae_TaskData ae tot = td_OutputType $ ae_TaskData ae -- writeOutput :: FS.FileSystem -> TaskOutputType -> (Int,[(k2,v2)]) -> IO (Maybe (Int,[FunctionData])) writeOutput _ TOTRawTuple (i,ts) = return $ Just $ (i,bs) where bs = map (\t -> TupleFunctionData $ encode t) ts -- TODO exception werfen writeOutput fs _ (i,ts) = do infoM localLogger "oc ts" c <- oc ts infoM localLogger "appendfile" FS.appendFile fn c fs infoM localLogger "return just" return $ Just (i,[FileFunctionData fn]) where fn = "j" ++ show (td_JobId td) ++ "_t" ++ show (td_TaskId td) ++ "_i" ++ show i defaultInputReader :: (NFData v1, NFData k1, Binary k1, Binary v1) => InputReader k1 v1 defaultInputReader = return . parseByteStringToList defaultOutputWriter :: (NFData v2, NFData k2, Binary k2, Binary v2) => OutputWriter k2 v2 defaultOutputWriter = return . listToByteString -- = return $ B.concat $ map encode ls type ActionName = String type ActionInfo = String data ActionConfiguration a k1 v1 k2 v2 v3 v4 = ActionConfiguration { ac_Name :: ActionName , ac_Info :: ActionInfo , ac_OptEncoder :: OptionsEncoder a , ac_OptDecoder :: OptionsDecoder a , ac_InputEncoder :: InputEncoder k1 v1 , ac_OutputDecoder :: OutputDecoder k2 v4 , ac_Split :: Maybe (SplitConfiguration a k1 v1) , ac_Map :: Maybe (MapConfiguration a k1 v1 k2 v2) , ac_Combine :: Maybe (ReduceConfiguration a k2 v2 v3) , ac_Reduce :: Maybe (ReduceConfiguration a k2 v3 v4) } data SplitConfiguration a k1 v1 = SplitConfiguration { sc_Function :: SplitFunction a k1 v1 , sc_Reader :: InputReader k1 v1 , sc_Writer :: OutputWriter k1 v1 } data MapConfiguration a k1 v1 k2 v2 = MapConfiguration { mc_Function :: MapFunction a k1 v1 k2 v2 , mc_Partition :: MapPartition a k2 v2 , mc_Reader :: InputReader k1 v1 , mc_Writer :: OutputWriter k2 v2 } data ReduceConfiguration a k2 v3 v4 = ReduceConfiguration { rc_Merge :: ReduceMerge a k2 v3 , rc_Function :: ReduceFunction a k2 v3 v4 , rc_Partition :: ReducePartition a k2 v4 , rc_Reader :: InputReader k2 v3 , rc_Writer :: OutputWriter k2 v4 } defaultActionConfiguration :: (NFData v1, NFData k1, Binary a, Binary k1, Binary v1, Binary k2, Binary v4) => ActionName -> ActionConfiguration a k1 v1 k2 v2 v3 v4 defaultActionConfiguration name = ActionConfiguration name "" defaultOptionsEncoder defaultOptionsDecoder defaultInputEncoder defaultOutputDecoder (Just defaultSplitConfiguration) Nothing Nothing Nothing defaultSplitConfiguration :: (NFData v1, NFData k1, Binary a, Binary k1, Binary v1) => SplitConfiguration a k1 v1 defaultSplitConfiguration = SplitConfiguration defaultSplit defaultInputReader defaultOutputWriter defaultMapConfiguration :: (NFData v1, NFData k1, NFData v2, NFData k2, Ord k2, Binary a, Binary k1, Binary v1, Binary k2, Binary v2) => MapFunction a k1 v1 k2 v2 -> MapConfiguration a k1 v1 k2 v2 defaultMapConfiguration fct = MapConfiguration fct defaultPartition defaultInputReader defaultOutputWriter defaultReduceConfiguration :: (NFData v2, NFData k2, NFData v3, Ord k2, Binary a, Binary k2, Binary v2, Binary v3) => ReduceFunction a k2 v2 v3 -> ReduceConfiguration a k2 v2 v3 defaultReduceConfiguration fct = ReduceConfiguration defaultMerge fct defaultPartition defaultInputReader defaultOutputWriter defaultSplit :: (NFData k1, NFData v1) => SplitFunction a k1 v1 defaultSplit _ _ n ls = do infoM localLogger "defaultSplit" return partedList where partedList = AMap.toList $ AMap.fromList ps ns = [(x `mod` n) + 1 | x <- [0..]] is = map (\a -> [a]) ls ps = zip ns is hashedPartition :: (Hash k2, Binary k2, Binary v2, NFData k2, NFData v2) => MapPartition a k2 v2 hashedPartition _ _ 1 l = return . (:[]) . (,) 1 $ l hashedPartition _ _ n l = do infoM localLogger "hashedPartition: map" let a = map (\t -> (hash n (fst t),[t])) l infoM localLogger "hashedPartition: fromList" let b = AMap.fromList a infoM localLogger "hashedPartition: toList" let c = AMap.toList b infoM localLogger "hashedPartition: return" return $ c defaultPartition :: (Binary k2, Binary v2) => MapPartition a k2 v2 defaultPartition _ _ 1 ls -- To make it faster, wenn no partition is used = return [(1,ls)] defaultPartition _ _ n ls = do -- calculate partition-Values let markedList = map (\t@(k,_) -> (hash' k,t)) ls -- merge them -- TODO this might change (revert) the order of the Elements... let resultList = AMap.toList $ AMap.fromTupleList markedList return resultList where -- calculate a hash-value, because we only have the Binary-Instance, we -- can only use the Bytestring of the Value hash' k = ((fromIntegral $ Hash.hashString (show $ encode k)) `mod` n) + 1 defaultMerge :: (Ord k2, Binary k2, Binary v2) => ReduceMerge a k2 v2 defaultMerge _ _ ls = return $ AMap.toList $ AMap.fromTupleList ls data ActionData = ActionData { ad_Name :: ActionName , ad_Info :: ActionInfo , ad_Split :: Maybe BinarySplitAction , ad_Map :: Maybe BinaryMapAction , ad_Combine :: Maybe BinaryReduceAction , ad_Reduce :: Maybe BinaryReduceAction } instance KMap.Key ActionData where getKey = ad_Name instance Show ActionData where show (ActionData n i _ _ _ _) = "{ActionData name:\"" ++ n ++ "\" info:\"" ++ i ++ "\"}" type ActionMap = KMap.KeyMap ActionData type OptionsEncoder a = a -> B.ByteString type OptionsDecoder a = B.ByteString -> a defaultOptionsEncoder :: (Binary a) => OptionsEncoder a defaultOptionsEncoder = encode defaultOptionsDecoder :: (Binary a) => OptionsDecoder a defaultOptionsDecoder = decode type InputEncoder k1 v1 = [(k1,v1)] -> [FS.FileId] -> [FunctionData] type OutputDecoder k2 v4 = [FunctionData] -> ([(k2,v4)],[FS.FileId]) defaultInputEncoder :: (Binary k1, Binary v1) => InputEncoder k1 v1 defaultInputEncoder ls1 ls2 = ls1' ++ ls2' where ls1' = map (\t -> TupleFunctionData (encode t)) ls1 ls2' = map (\f -> FileFunctionData f) ls2 defaultOutputDecoder :: (Binary k2, Binary v4) => OutputDecoder k2 v4 defaultOutputDecoder ls = (ls1, ls2) where ls1 = map (\t -> decode t) $ catMaybes ls1' ls2 = catMaybes ls2' (ls1',ls2') = unzip $ map (splitter) ls splitter (TupleFunctionData t) = (Just t, Nothing) splitter (FileFunctionData f) = (Nothing, Just f) getActionForTaskType :: TaskType -> ActionData -> Maybe BinaryReduceAction getActionForTaskType TTSplit ad = ad_Split ad getActionForTaskType TTMap ad = ad_Map ad getActionForTaskType TTCombine ad = ad_Combine ad getActionForTaskType TTReduce ad = ad_Reduce ad getActionForTaskType _ _ = Nothing readActionConfiguration :: ( Ord k2, Binary a , Show k1, Show v1 , Show k2, Show v2, Show v3, Show v4 , NFData k1, NFData v1 , NFData k2, NFData v2, NFData v3 , Binary k1, Binary v1 , Binary k2, Binary v2 , Binary v3, Binary v4) => ActionConfiguration a k1 v1 k2 v2 v3 v4 -> ActionData readActionConfiguration (ActionConfiguration n i _ optDec _ _ sc mc cc rc) = ActionData n i sf mf cf rf where sf = maybe Nothing (\(SplitConfiguration f ir ow) -> Just $ performSplitAction optDec f ir ow) sc mf = maybe Nothing (\(MapConfiguration f p ir ow) -> Just $ performMapAction optDec f p ir ow) mc cf = maybe Nothing (\(ReduceConfiguration m f p ir ow) -> Just $ performReduceAction optDec m f p ir ow) cc rf = maybe Nothing (\(ReduceConfiguration m f p ir ow) -> Just $ performReduceAction optDec m f p ir ow) rc createJobInfoFromConfiguration :: ActionConfiguration a k1 v1 k2 v2 v3 v4 -> a -- ^ options -> [(k1,v1)] -- ^ input (Tuples) -> [FS.FileId] -- ^ input (Files) -> Int -- ^ number of splitters -> Int -- ^ number of mappers -> Int -- ^ number of reducers -> Int -- ^ number of results -> TaskOutputType -- ^ type of the result (file of raw) -> JobInfo createJobInfoFromConfiguration (ActionConfiguration n _ optEnc _ inEnc _ sConf mConf cConf rConf) opts' ls1 ls2 sCnt mCnt rCnt nor' rt = JobInfo n opts sa ma ca ra nor i where opts = optEnc opts' sa = maybe Nothing (\_ -> Just $ JobAction n TOTFile sCnt) sConf ma = maybe Nothing (\_ -> Just $ JobAction n TOTFile mCnt) mConf ca = maybe Nothing (\_ -> Just $ JobAction n TOTFile mCnt) cConf ra = maybe Nothing (\_ -> Just $ JobAction n rt rCnt) rConf nor = Just nor' i = inEnc ls1 ls2 createListsFromJobResult :: ActionConfiguration a k1 v1 k2 v2 v3 v4 -> JobResult -> ([(k2,v4)],[FS.FileId]) createListsFromJobResult ac jr = (ac_OutputDecoder ac) (jr_Output jr) -- ---------------------------------------------------------------------------- -- SplitAction -- ---------------------------------------------------------------------------- -- | general SplitAction type SplitAction a k1 v1 = ActionEnvironment -> a -> Int -> [(k1,v1)] -> IO [(Int, [(k1,v1)])] -- | SplitAction on ByteStrings type BinarySplitAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])] type SplitFunction a k1 v1 = SplitAction a k1 v1 performSplitAction :: (NFData k1, NFData v1, Binary a, Binary k1, Binary v1, Show k1, Show v1) => OptionsDecoder a -> SplitFunction a k1 v1 -> InputReader k1 v1 -> OutputWriter k1 v1 -> ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])] performSplitAction optDec fct reader writer env opts n (i,ls) = do let a = optDec opts infoM localLogger "performSplitAction" putTimeStamp "Begin performSplitAction" infoM localLogger "reading inputList" inputList <- readConnector reader env ls debugM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ show inputList ++ "\n\n" infoM localLogger "doing split" partedList <- case n of (Just n') -> fct env a n' inputList (Nothing) -> return [(i,inputList)] debugM localLogger $ ">>>>>>>>>>>>>>>>>> splittet list is: " ++ show partedList ++ "\n\n" infoM localLogger "writing outputlist" outputList <- writeConnector writer env partedList putTimeStamp "End performSplitAction" return outputList -- ---------------------------------------------------------------------------- -- MapAction -- ---------------------------------------------------------------------------- -- | general MapAction type MapAction a k1 v1 k2 v2 = ActionEnvironment -> a -> Int -> [(k1,v1)] -> IO [(Int, [(k2,v2)])] -- | MapAction on ByteStrings type BinaryMapAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])] type MapFunction a k1 v1 k2 v2 = ActionEnvironment -> a -> k1 -> v1 -> IO [(k2, v2)] type MapPartition a k2 v2 = ActionEnvironment -> a -> Int -> [(k2,v2)] -> IO [(Int, [(k2,v2)])] performMapAction :: (Ord k2, Show k1, Show k2, Show v1, Show v2, Binary a, Binary k1, Binary v1, Binary k2, Binary v2, NFData k2, NFData v2, NFData v1, NFData k1) => OptionsDecoder a -> MapFunction a k1 v1 k2 v2 -> MapPartition a k2 v2 -> InputReader k1 v1 -> OutputWriter k2 v2 -> ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])] performMapAction optDec fct part reader writer env opts n (i,ls) = do -- decode the options let a = optDec opts infoM localLogger "performMapAction" putTimeStamp "Begin performMapAction" infoM localLogger $ "reading inputList: " ++ show (i,ls) inputList <- readConnector reader env ls debugM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ show inputList ++ "\n\n" infoM localLogger ("doing map: " ++ (show . length $ inputList)) mappedList <- mapM (\(k1, v1) -> fct env a k1 v1) inputList let tupleList = concat mappedList infoM localLogger $ "map result: " ++ (show . length $ tupleList) debugM localLogger $ ">>>>>>>>>>>>>>>>>> mapped list is: " ++ show tupleList ++ "\n\n" infoM localLogger "doing partition" partedList <- case n of (Just n') -> part env a n' tupleList (Nothing) -> return [(i,tupleList)] infoM localLogger $ "map parted result: " ++ (show . length $ partedList) debugM localLogger $ ">>>>>>>>>>>>>>>>>> partitioned list is: " ++ show partedList ++ "\n\n" infoM localLogger "writing outputlist: begin" outputList <- writeConnector writer env partedList infoM localLogger "writing outputlist: done" putTimeStamp "End performMapAction" return outputList -- ---------------------------------------------------------------------------- -- Combine- / ReduceTask -- ---------------------------------------------------------------------------- -- | general MapAction type ReduceAction a k2 v2 v3 = ActionEnvironment -> a -> Int -> [(k2,v2)] -> IO [(Int, [(k2,v3)])] -- | MapAction on ByteStrings type BinaryReduceAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])] type ReduceMerge a k2 v2 = ActionEnvironment -> a -> [(k2,v2)] -> IO [(k2,[v2])] type ReduceFunction a k2 v2 v3 = ActionEnvironment -> a -> k2 -> [v2] -> IO (Maybe v3) type ReducePartition a k2 v3 = ActionEnvironment -> a -> Int -> [(k2,v3)] -> IO [(Int, [(k2,v3)])] performReduceAction :: (Ord k2, Show k2, Show v2, Show v3, NFData k2, NFData v2, Binary a, Binary k2, Binary v2, Binary v3) => OptionsDecoder a -> ReduceMerge a k2 v2 -> ReduceFunction a k2 v2 v3 -> ReducePartition a k2 v3 -> InputReader k2 v2 -> OutputWriter k2 v3 -> ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])] performReduceAction optDec merge fct part reader writer env opts n (i,ls) = do -- decode the options let a = optDec opts infoM localLogger "performReduceAction" putTimeStamp "Begin performReduceAction" infoM localLogger "reading inputList" inputList <- readConnector reader env ls infoM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ (show . length $ inputList) ++ "\n\n" debugM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ show inputList ++ "\n\n" infoM localLogger "doing merge" mergedList <- merge env a inputList debugM localLogger $ ">>>>>>>>>>>>>>>>>> mergedList is: " ++ show mergedList ++ "\n\n" infoM localLogger "doing reduce" maybesList <- mapM (\(k2,v2s) -> performReduceFunction a k2 v2s) mergedList let tupleList = catMaybes maybesList debugM localLogger $ ">>>>>>>>>>>>>>>>>> tupleList is: " ++ show tupleList ++ "\n\n" infoM localLogger "doing partition" partedList <- case n of (Just n') -> part env a n' tupleList (Nothing) -> return [(i,tupleList)] debugM localLogger $ ">>>>>>>>>>>>>>>>>> partedList is: " ++ show partedList ++ "\n\n" infoM localLogger "writing outputlist: begin" outputList <- writeConnector writer env partedList infoM localLogger "writing outputlist: done" putTimeStamp "End performReduceAction" return outputList where performReduceFunction a k2 v2s = do mbV3 <- fct env a k2 v2s case mbV3 of (Nothing) -> return Nothing (Just v3) -> return $ Just (k2,v3) putTimeStamp :: String -> IO () putTimeStamp s = do t1 <- getPOSIXTime infoM localLogger (s++" : "++ show t1)