module Holumbus.MapReduce.Types
(
Hash(..)
, hashedPartition
, FunctionData(..)
, TaskId
, TaskType(..)
, TaskState(..)
, getNextTaskState
, TaskOutputType(..)
, TaskData(..)
, JobId
, JobState(..)
, getNextJobState
, getPrevJobState
, fromJobStatetoTaskType
, OutputMap
, JobAction(..)
, JobInfo(..)
, JobData(..)
, JobResultContainer(..)
, JobResult(..)
, ActionName
, ActionInfo
, ActionEnvironment(..)
, mkActionEnvironment
, InputReader
, OutputWriter
, OptionsDecoder
, defaultInputReader
, defaultOutputWriter
, defaultSplit
, defaultPartition
, defaultMerge
, readConnector
, writeConnector
, ActionConfiguration(..)
, SplitConfiguration(..)
, MapConfiguration(..)
, ReduceConfiguration(..)
, defaultActionConfiguration
, defaultSplitConfiguration
, defaultMapConfiguration
, defaultReduceConfiguration
, readActionConfiguration
, createJobInfoFromConfiguration
, createListsFromJobResult
, ActionData(..)
, getActionForTaskType
, ActionMap
, MapAction
, MapFunction
, MapPartition
, ReduceAction
, ReduceMerge
, ReduceFunction
, ReducePartition
, SplitFunction
, SplitAction
)
where
import Control.Concurrent
import Data.Binary
import qualified Data.ByteString.Lazy as B
import qualified Data.Map as Map
import qualified Data.HashTable as Hash
import Data.Maybe
import Data.Time
import System.Log.Logger
import Data.Time.Clock.POSIX
import Text.XML.HXT.Arrow
import qualified Holumbus.Data.AccuMap as AMap
import Holumbus.Common.FileHandling
import Holumbus.Common.Utils
import qualified Holumbus.Data.KeyMap as KMap
import qualified Holumbus.FileSystem.FileSystem as FS
import Control.DeepSeq
localLogger :: String
localLogger = "Holumbus.MapReduce.Types"
class Hash a where
hash :: Int -> a -> Int
instance Hash Int where
hash n k = mod k n
data FunctionData
= TupleFunctionData B.ByteString
| FileFunctionData FS.FileId
deriving (Show, Eq, Ord)
instance Binary FunctionData where
put (TupleFunctionData t) = putWord8 0 >> put t
put (FileFunctionData f) = putWord8 1 >> put f
get
= do
t <- getWord8
case t of
1 -> get >>= \f -> return (FileFunctionData f)
_ -> get >>= \b -> return (TupleFunctionData b)
instance XmlPickler FunctionData where
xpickle = xpFunctionData
xpFunctionData :: PU FunctionData
xpFunctionData
= xpElem "data" $
xpAlt tag ps
where
tag (TupleFunctionData _) = 0
tag (FileFunctionData _) = 1
ps = [xpWrap (\p -> TupleFunctionData (encodeTuple p), \(TupleFunctionData p) -> decodeTuple p) xpRawFunctionData
,xpWrap (\f -> FileFunctionData f, \(FileFunctionData f) -> f) xpFileFunctionData ]
xpRawFunctionData = xpPair (xpElem "key" xpText0) (xpElem "value" xpText0)
xpFileFunctionData = xpElem "filename" xpText
encodeTuple :: (Binary k, Binary v) => (k, v) -> B.ByteString
encodeTuple t = encode t
decodeTuple :: (Binary k, Binary v) => B.ByteString -> (k, v)
decodeTuple t = decode t
type TaskId = Integer
data TaskType = TTSplit | TTMap | TTCombine | TTReduce | TTError
deriving (Show, Eq, Ord)
instance Binary TaskType where
put (TTSplit) = putWord8 1
put (TTMap) = putWord8 2
put (TTCombine) = putWord8 3
put (TTReduce) = putWord8 4
put (TTError) = putWord8 0
get
= do
t <- getWord8
case t of
1 -> return (TTSplit)
2 -> return (TTMap)
3 -> return (TTCombine)
4 -> return (TTReduce)
_ -> return (TTError)
data TaskState = TSIdle | TSSending | TSInProgress | TSCompleted | TSFinished | TSError
deriving (Show, Eq, Ord, Enum)
instance Binary TaskState where
put (TSIdle) = putWord8 1
put (TSSending) = putWord8 2
put (TSInProgress) = putWord8 3
put (TSCompleted) = putWord8 4
put (TSFinished) = putWord8 5
put (TSError) = putWord8 0
get
= do
t <- getWord8
case t of
1 -> return (TSIdle)
2 -> return (TSSending)
3 -> return (TSInProgress)
4 -> return (TSCompleted)
5 -> return (TSFinished)
_ -> return (TSError)
getNextTaskState :: TaskState -> TaskState
getNextTaskState TSError = TSError
getNextTaskState TSFinished = TSFinished
getNextTaskState s = succ s
data TaskOutputType = TOTRawTuple | TOTFile
deriving (Show, Read, Eq, Ord, Enum)
instance Binary TaskOutputType where
put (TOTRawTuple) = putWord8 0
put (TOTFile) = putWord8 1
get
= do
t <- getWord8
case t of
0 -> return (TOTRawTuple)
_ -> return (TOTFile)
instance XmlPickler TaskOutputType where
xpickle = xpAttr "output" $ xpPrim
data TaskData = TaskData {
td_JobId :: ! JobId
, td_TaskId :: ! TaskId
, td_Type :: ! TaskType
, td_State :: ! TaskState
, td_Option :: ! B.ByteString
, td_PartValue :: ! (Maybe Int)
, td_Input :: ! (Int, [FunctionData])
, td_Output :: ! [(Int,[FunctionData])]
, td_OutputType :: ! TaskOutputType
, td_Action :: ! ActionName
} deriving (Show, Eq, Ord)
instance Binary TaskData where
put (TaskData jid tid tt ts opt pv i o ot a)
= put jid >> put tid >> put tt >> put ts >> put opt >> put pv >> put i >> put o >> put ot >> put a
get
= do
jid <- get
tid <- get
tt <- get
ts <- get
opt <- get
pv <- get
i <- get
o <- get
ot <- get
a <- get
return (TaskData jid tid tt ts opt pv i o ot a)
type JobId = Integer
data JobState = JSPlanned | JSIdle | JSSplit | JSMap | JSCombine | JSReduce | JSCompleted | JSFinished | JSError
deriving(Show, Eq, Ord, Enum)
instance Binary JobState where
put (JSPlanned) = putWord8 1
put (JSIdle) = putWord8 2
put (JSSplit) = putWord8 3
put (JSMap) = putWord8 4
put (JSCombine) = putWord8 5
put (JSReduce) = putWord8 6
put (JSCompleted) = putWord8 7
put (JSFinished) = putWord8 8
put (JSError) = putWord8 0
get
= do
t <- getWord8
case t of
1 -> return (JSPlanned)
2 -> return (JSIdle)
3 -> return (JSSplit)
4 -> return (JSMap)
5 -> return (JSCombine)
6 -> return (JSReduce)
7 -> return (JSCompleted)
8 -> return (JSFinished)
_ -> return (JSError)
getNextJobState :: JobState -> JobState
getNextJobState JSError = JSError
getNextJobState JSFinished = JSFinished
getNextJobState s = succ s
getPrevJobState :: JobState -> JobState
getPrevJobState JSIdle = JSIdle
getPrevJobState JSError = JSError
getPrevJobState s = pred s
fromJobStatetoTaskType :: JobState -> Maybe TaskType
fromJobStatetoTaskType JSSplit = Just TTSplit
fromJobStatetoTaskType JSMap = Just TTMap
fromJobStatetoTaskType JSCombine = Just TTCombine
fromJobStatetoTaskType JSReduce = Just TTReduce
fromJobStatetoTaskType _ = Nothing
type OutputMap = Map.Map JobState (AMap.AccuMap Int FunctionData)
data JobAction = JobAction {
ja_Name :: ! ActionName
, ja_Output :: ! TaskOutputType
, ja_Count :: ! Int
} deriving (Show, Eq)
instance Binary JobAction where
put (JobAction n o c) = put n >> put o >> put c
get
= do
n <- get
o <- get
c <- get
return (JobAction n o c)
instance XmlPickler JobAction where
xpickle = xpJobAction
xpJobAction :: PU JobAction
xpJobAction =
xpWrap
(\(n, o, c) -> JobAction n o c,
\(JobAction n o c) -> (n, o, c)) xpAction
where
xpAction = xpTriple (xpFunction) (xpickle) (xpCount)
xpFunction = xpAttr "name" xpText
xpCount = xpWrap ((maybe 1 id), Just) $ xpOption $ xpAttr "count" xpickle
data JobInfo = JobInfo {
ji_Description :: ! String
, ji_Option :: ! B.ByteString
, ji_SplitAction :: ! (Maybe JobAction)
, ji_MapAction :: ! (Maybe JobAction)
, ji_CombineAction :: ! (Maybe JobAction)
, ji_ReduceAction :: ! (Maybe JobAction)
, ji_NumOfResults :: ! (Maybe Int)
, ji_Input :: ! [FunctionData]
} deriving (Show, Eq)
instance Binary JobInfo where
put (JobInfo d opt sa ma ca ra nor i)
= put d >> put opt >>
put sa >> put ma >> put ca >> put ra >> put nor >> put i
get
= do
d <- get
opt <- get
sa <- get
ma <- get
ca <- get
ra <- get
nor <- get
i <- get
return (JobInfo d opt sa ma ca ra nor i)
instance XmlPickler JobInfo where
xpickle = xpJobInfo
xpJobInfo :: PU JobInfo
xpJobInfo =
xpElem "jobInfo" $
xpWrap
(\(n, (sa,ma,ca,ra), nor, i) -> JobInfo n (encode ()) sa ma ca ra nor i,
\(JobInfo n _ sa ma ca ra nor i) -> (n, (sa,ma,ca,ra), nor, i)) xpJob
where
xpJob =
xp4Tuple
(xpAttr "name" xpText0)
(xpActions)
(xpCount)
(xpFunctionDataList)
xpActions = xp4Tuple (xpSplitAction) (xpMapAction) (xpCombineAction) (xpReduceAction)
xpSplitAction = xpOption $ xpElem "split" $ xpJobAction
xpMapAction = xpOption $ xpElem "map" $ xpJobAction
xpCombineAction = xpOption $ xpElem "combine" $ xpJobAction
xpReduceAction = xpOption $ xpElem "reduce" $ xpJobAction
xpFunctionDataList
= xpWrap (filterEmptyList,setEmptyList) $ xpOption $
xpElem "inputList" $ xpList xpFunctionData
xpCount = xpOption $ xpAttr "numOfResults" xpickle
data JobData = JobData {
jd_JobId :: JobId
, jd_State :: JobState
, jd_OutputMap :: OutputMap
, jd_Info :: JobInfo
, jd_startTime :: UTCTime
, jd_endTime :: UTCTime
, jd_Result :: JobResultContainer
}
instance Show JobData where
show (JobData jid state om _ t1 t2 _) =
"JobId:\t" ++ show jid ++ "\n" ++
"State:\t" ++ show state ++ "\n" ++
"Info:\tJobInfo\n" ++
"StartTime:\t" ++ show t1 ++ "\n" ++
"EndTime:\t" ++ show t2 ++ "\n" ++
"OutputMap:\n" ++ show om ++ "\n"
data JobResultContainer = JobResultContainer (MVar JobResult)
instance Show JobResultContainer where
show _ = "JobResultContainer"
data JobResult = JobResult {
jr_Output :: [FunctionData]
} deriving (Show)
instance Binary JobResult where
put (JobResult o) = put o
get
= do
o <- get
return (JobResult o)
data ActionEnvironment = ActionEnvironment {
ae_TaskData :: TaskData
, ae_FileSystem :: FS.FileSystem
}
mkActionEnvironment :: TaskData -> FS.FileSystem -> ActionEnvironment
mkActionEnvironment td fs = ActionEnvironment td fs
type InputReader k1 v1 = B.ByteString -> IO [(k1,v1)]
type OutputWriter k2 v2 = [(k2,v2)] -> IO B.ByteString
readConnector
:: (NFData k1, NFData v1, Binary k1, Binary v1)
=> InputReader k1 v1
-> ActionEnvironment
-> [FunctionData]
-> IO [(k1,v1)]
readConnector ic ae ls
= do
let (tuples,filenames) = splitList ls
mayBeTupleContent <- mapM readInput tuples
fileContent <- read2 filenames
return . concat $ (catMaybes mayBeTupleContent) ++ fileContent
where
splitList :: [FunctionData] -> ([B.ByteString],[FS.FileId])
splitList [] = ([],[])
splitList ((TupleFunctionData t):xs) = ((t:ts),fs)
where
(ts,fs) = splitList xs
splitList ((FileFunctionData f):xs) = (ts,f:fs)
where
(ts,fs) = splitList xs
read2 files = do
contents <- FS.getMultiFileContent files fs
encodedcontents <- mapM (ic . snd) contents
return encodedcontents
where
fs = ae_FileSystem ae
readInput t = return $ Just $ [decode t]
writeConnector
:: (Binary k2, Binary v2)
=> OutputWriter k2 v2
-> ActionEnvironment
-> [(Int,[(k2,v2)])]
-> IO [(Int,[FunctionData])]
writeConnector oc ae ls
= do
infoM localLogger $ "writeConnector: " ++ (show . length $ ls)
os <- case tot of
TOTRawTuple -> mapM (writeOutput (ae_FileSystem ae) tot) ls
_ -> write2
return $ catMaybes os
where
write2 = do
bincontents <- mapM (\(_,c) -> oc c) ls
let filelist = zip filenames bincontents;
FS.createFiles filelist fs
return $ (zipWith (\(i,_) fn -> Just (i,[FileFunctionData fn]) ) ls filenames)
where
filenames = map (\(i,_) -> "j" ++ show (td_JobId td) ++ "_t" ++ show (td_TaskId td) ++ "_i" ++ show i) ls
fs = ae_FileSystem ae
td = ae_TaskData ae
tot = td_OutputType $ ae_TaskData ae
writeOutput _ TOTRawTuple (i,ts)
= return $ Just $ (i,bs)
where
bs = map (\t -> TupleFunctionData $ encode t) ts
writeOutput fs _ (i,ts)
= do
infoM localLogger "oc ts"
c <- oc ts
infoM localLogger "appendfile"
FS.appendFile fn c fs
infoM localLogger "return just"
return $ Just (i,[FileFunctionData fn])
where
fn = "j" ++ show (td_JobId td) ++ "_t" ++ show (td_TaskId td) ++ "_i" ++ show i
defaultInputReader :: (NFData v1, NFData k1, Binary k1, Binary v1) => InputReader k1 v1
defaultInputReader = return . parseByteStringToList
defaultOutputWriter :: (NFData v2, NFData k2, Binary k2, Binary v2) => OutputWriter k2 v2
defaultOutputWriter = return . listToByteString
type ActionName = String
type ActionInfo = String
data ActionConfiguration a k1 v1 k2 v2 v3 v4
= ActionConfiguration {
ac_Name :: ActionName
, ac_Info :: ActionInfo
, ac_OptEncoder :: OptionsEncoder a
, ac_OptDecoder :: OptionsDecoder a
, ac_InputEncoder :: InputEncoder k1 v1
, ac_OutputDecoder :: OutputDecoder k2 v4
, ac_Split :: Maybe (SplitConfiguration a k1 v1)
, ac_Map :: Maybe (MapConfiguration a k1 v1 k2 v2)
, ac_Combine :: Maybe (ReduceConfiguration a k2 v2 v3)
, ac_Reduce :: Maybe (ReduceConfiguration a k2 v3 v4)
}
data SplitConfiguration a k1 v1
= SplitConfiguration {
sc_Function :: SplitFunction a k1 v1
, sc_Reader :: InputReader k1 v1
, sc_Writer :: OutputWriter k1 v1
}
data MapConfiguration a k1 v1 k2 v2
= MapConfiguration {
mc_Function :: MapFunction a k1 v1 k2 v2
, mc_Partition :: MapPartition a k2 v2
, mc_Reader :: InputReader k1 v1
, mc_Writer :: OutputWriter k2 v2
}
data ReduceConfiguration a k2 v3 v4
= ReduceConfiguration {
rc_Merge :: ReduceMerge a k2 v3
, rc_Function :: ReduceFunction a k2 v3 v4
, rc_Partition :: ReducePartition a k2 v4
, rc_Reader :: InputReader k2 v3
, rc_Writer :: OutputWriter k2 v4
}
defaultActionConfiguration
:: (NFData v1, NFData k1, Binary a, Binary k1, Binary v1, Binary k2, Binary v4)
=> ActionName
-> ActionConfiguration a k1 v1 k2 v2 v3 v4
defaultActionConfiguration name
= ActionConfiguration
name
""
defaultOptionsEncoder
defaultOptionsDecoder
defaultInputEncoder
defaultOutputDecoder
(Just defaultSplitConfiguration)
Nothing
Nothing
Nothing
defaultSplitConfiguration
:: (NFData v1, NFData k1, Binary a, Binary k1, Binary v1)
=> SplitConfiguration a k1 v1
defaultSplitConfiguration
= SplitConfiguration
defaultSplit
defaultInputReader
defaultOutputWriter
defaultMapConfiguration
:: (NFData v1, NFData k1, NFData v2, NFData k2, Ord k2, Binary a, Binary k1, Binary v1, Binary k2, Binary v2)
=> MapFunction a k1 v1 k2 v2
-> MapConfiguration a k1 v1 k2 v2
defaultMapConfiguration fct
= MapConfiguration
fct
defaultPartition
defaultInputReader
defaultOutputWriter
defaultReduceConfiguration
:: (NFData v2, NFData k2, NFData v3, Ord k2, Binary a, Binary k2, Binary v2, Binary v3)
=> ReduceFunction a k2 v2 v3
-> ReduceConfiguration a k2 v2 v3
defaultReduceConfiguration fct
= ReduceConfiguration
defaultMerge
fct
defaultPartition
defaultInputReader
defaultOutputWriter
defaultSplit
:: (NFData k1, NFData v1) => SplitFunction a k1 v1
defaultSplit _ _ n ls = do
infoM localLogger "defaultSplit"
return partedList
where
partedList = AMap.toList $ AMap.fromList ps
ns = [(x `mod` n) + 1 | x <- [0..]]
is = map (\a -> [a]) ls
ps = zip ns is
hashedPartition :: (Hash k2, Binary k2, Binary v2, NFData k2, NFData v2) => MapPartition a k2 v2
hashedPartition _ _ 1 l = return . (:[]) . (,) 1 $ l
hashedPartition _ _ n l = do
infoM localLogger "hashedPartition: map"
let a = map (\t -> (hash n (fst t),[t])) l
infoM localLogger "hashedPartition: fromList"
let b = AMap.fromList a
infoM localLogger "hashedPartition: toList"
let c = AMap.toList b
infoM localLogger "hashedPartition: return"
return $ c
defaultPartition
:: (Binary k2, Binary v2)
=> MapPartition a k2 v2
defaultPartition _ _ 1 ls
= return [(1,ls)]
defaultPartition _ _ n ls
= do
let markedList = map (\t@(k,_) -> (hash' k,t)) ls
let resultList = AMap.toList $ AMap.fromTupleList markedList
return resultList
where
hash' k = ((fromIntegral $ Hash.hashString (show $ encode k)) `mod` n) + 1
defaultMerge
:: (Ord k2, Binary k2, Binary v2)
=> ReduceMerge a k2 v2
defaultMerge _ _ ls
= return $ AMap.toList $ AMap.fromTupleList ls
data ActionData
= ActionData {
ad_Name :: ActionName
, ad_Info :: ActionInfo
, ad_Split :: Maybe BinarySplitAction
, ad_Map :: Maybe BinaryMapAction
, ad_Combine :: Maybe BinaryReduceAction
, ad_Reduce :: Maybe BinaryReduceAction
}
instance KMap.Key ActionData where
getKey = ad_Name
instance Show ActionData where
show (ActionData n i _ _ _ _) = "{ActionData name:\"" ++ n ++ "\" info:\"" ++ i ++ "\"}"
type ActionMap = KMap.KeyMap ActionData
type OptionsEncoder a = a -> B.ByteString
type OptionsDecoder a = B.ByteString -> a
defaultOptionsEncoder :: (Binary a) => OptionsEncoder a
defaultOptionsEncoder = encode
defaultOptionsDecoder :: (Binary a) => OptionsDecoder a
defaultOptionsDecoder = decode
type InputEncoder k1 v1 = [(k1,v1)] -> [FS.FileId] -> [FunctionData]
type OutputDecoder k2 v4 = [FunctionData] -> ([(k2,v4)],[FS.FileId])
defaultInputEncoder :: (Binary k1, Binary v1) => InputEncoder k1 v1
defaultInputEncoder ls1 ls2 = ls1' ++ ls2'
where
ls1' = map (\t -> TupleFunctionData (encode t)) ls1
ls2' = map (\f -> FileFunctionData f) ls2
defaultOutputDecoder :: (Binary k2, Binary v4) => OutputDecoder k2 v4
defaultOutputDecoder ls = (ls1, ls2)
where
ls1 = map (\t -> decode t) $ catMaybes ls1'
ls2 = catMaybes ls2'
(ls1',ls2') = unzip $ map (splitter) ls
splitter (TupleFunctionData t) = (Just t, Nothing)
splitter (FileFunctionData f) = (Nothing, Just f)
getActionForTaskType :: TaskType -> ActionData -> Maybe BinaryReduceAction
getActionForTaskType TTSplit ad = ad_Split ad
getActionForTaskType TTMap ad = ad_Map ad
getActionForTaskType TTCombine ad = ad_Combine ad
getActionForTaskType TTReduce ad = ad_Reduce ad
getActionForTaskType _ _ = Nothing
readActionConfiguration
:: ( Ord k2, Binary a
, Show k1, Show v1
, Show k2, Show v2, Show v3, Show v4
, NFData k1, NFData v1
, NFData k2, NFData v2, NFData v3
, Binary k1, Binary v1
, Binary k2, Binary v2
, Binary v3, Binary v4)
=> ActionConfiguration a k1 v1 k2 v2 v3 v4
-> ActionData
readActionConfiguration (ActionConfiguration n i _ optDec _ _ sc mc cc rc)
= ActionData n i sf mf cf rf
where
sf = maybe Nothing (\(SplitConfiguration f ir ow) -> Just $ performSplitAction optDec f ir ow) sc
mf = maybe Nothing (\(MapConfiguration f p ir ow) -> Just $ performMapAction optDec f p ir ow) mc
cf = maybe Nothing (\(ReduceConfiguration m f p ir ow) -> Just $ performReduceAction optDec m f p ir ow) cc
rf = maybe Nothing (\(ReduceConfiguration m f p ir ow) -> Just $ performReduceAction optDec m f p ir ow) rc
createJobInfoFromConfiguration
:: ActionConfiguration a k1 v1 k2 v2 v3 v4
-> a
-> [(k1,v1)]
-> [FS.FileId]
-> Int
-> Int
-> Int
-> Int
-> TaskOutputType
-> JobInfo
createJobInfoFromConfiguration (ActionConfiguration n _ optEnc _ inEnc _ sConf mConf cConf rConf) opts' ls1 ls2 sCnt mCnt rCnt nor' rt
= JobInfo n opts sa ma ca ra nor i
where
opts = optEnc opts'
sa = maybe Nothing (\_ -> Just $ JobAction n TOTFile sCnt) sConf
ma = maybe Nothing (\_ -> Just $ JobAction n TOTFile mCnt) mConf
ca = maybe Nothing (\_ -> Just $ JobAction n TOTFile mCnt) cConf
ra = maybe Nothing (\_ -> Just $ JobAction n rt rCnt) rConf
nor = Just nor'
i = inEnc ls1 ls2
createListsFromJobResult
:: ActionConfiguration a k1 v1 k2 v2 v3 v4
-> JobResult
-> ([(k2,v4)],[FS.FileId])
createListsFromJobResult ac jr = (ac_OutputDecoder ac) (jr_Output jr)
type SplitAction a k1 v1 = ActionEnvironment -> a -> Int -> [(k1,v1)] -> IO [(Int, [(k1,v1)])]
type BinarySplitAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])]
type SplitFunction a k1 v1 = SplitAction a k1 v1
performSplitAction
:: (NFData k1, NFData v1, Binary a, Binary k1, Binary v1, Show k1, Show v1)
=> OptionsDecoder a
-> SplitFunction a k1 v1
-> InputReader k1 v1
-> OutputWriter k1 v1
-> ActionEnvironment
-> B.ByteString
-> Maybe Int
-> (Int,[FunctionData])
-> IO [(Int, [FunctionData])]
performSplitAction optDec fct reader writer env opts n (i,ls)
= do
let a = optDec opts
infoM localLogger "performSplitAction"
putTimeStamp "Begin performSplitAction"
infoM localLogger "reading inputList"
inputList <- readConnector reader env ls
debugM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ show inputList ++ "\n\n"
infoM localLogger "doing split"
partedList <- case n of
(Just n') -> fct env a n' inputList
(Nothing) -> return [(i,inputList)]
debugM localLogger $ ">>>>>>>>>>>>>>>>>> splittet list is: " ++ show partedList ++ "\n\n"
infoM localLogger "writing outputlist"
outputList <- writeConnector writer env partedList
putTimeStamp "End performSplitAction"
return outputList
type MapAction a k1 v1 k2 v2 = ActionEnvironment -> a -> Int -> [(k1,v1)] -> IO [(Int, [(k2,v2)])]
type BinaryMapAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])]
type MapFunction a k1 v1 k2 v2 = ActionEnvironment -> a -> k1 -> v1 -> IO [(k2, v2)]
type MapPartition a k2 v2 = ActionEnvironment -> a -> Int -> [(k2,v2)] -> IO [(Int, [(k2,v2)])]
performMapAction
:: (Ord k2, Show k1, Show k2, Show v1, Show v2,
Binary a, Binary k1, Binary v1, Binary k2, Binary v2, NFData k2, NFData v2, NFData v1, NFData k1)
=> OptionsDecoder a
-> MapFunction a k1 v1 k2 v2
-> MapPartition a k2 v2
-> InputReader k1 v1
-> OutputWriter k2 v2
-> ActionEnvironment
-> B.ByteString
-> Maybe Int
-> (Int,[FunctionData])
-> IO [(Int, [FunctionData])]
performMapAction optDec fct part reader writer env opts n (i,ls)
= do
let a = optDec opts
infoM localLogger "performMapAction"
putTimeStamp "Begin performMapAction"
infoM localLogger $ "reading inputList: " ++ show (i,ls)
inputList <- readConnector reader env ls
debugM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ show inputList ++ "\n\n"
infoM localLogger ("doing map: " ++ (show . length $ inputList))
mappedList <- mapM (\(k1, v1) -> fct env a k1 v1) inputList
let tupleList = concat mappedList
infoM localLogger $ "map result: " ++ (show . length $ tupleList)
debugM localLogger $ ">>>>>>>>>>>>>>>>>> mapped list is: " ++ show tupleList ++ "\n\n"
infoM localLogger "doing partition"
partedList <- case n of
(Just n') -> part env a n' tupleList
(Nothing) -> return [(i,tupleList)]
infoM localLogger $ "map parted result: " ++ (show . length $ partedList)
debugM localLogger $ ">>>>>>>>>>>>>>>>>> partitioned list is: " ++ show partedList ++ "\n\n"
infoM localLogger "writing outputlist: begin"
outputList <- writeConnector writer env partedList
infoM localLogger "writing outputlist: done"
putTimeStamp "End performMapAction"
return outputList
type ReduceAction a k2 v2 v3 = ActionEnvironment -> a -> Int -> [(k2,v2)] -> IO [(Int, [(k2,v3)])]
type BinaryReduceAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])]
type ReduceMerge a k2 v2 = ActionEnvironment -> a -> [(k2,v2)] -> IO [(k2,[v2])]
type ReduceFunction a k2 v2 v3 = ActionEnvironment -> a -> k2 -> [v2] -> IO (Maybe v3)
type ReducePartition a k2 v3 = ActionEnvironment -> a -> Int -> [(k2,v3)] -> IO [(Int, [(k2,v3)])]
performReduceAction
:: (Ord k2, Show k2, Show v2, Show v3,
NFData k2, NFData v2,
Binary a, Binary k2, Binary v2, Binary v3)
=> OptionsDecoder a
-> ReduceMerge a k2 v2
-> ReduceFunction a k2 v2 v3
-> ReducePartition a k2 v3
-> InputReader k2 v2
-> OutputWriter k2 v3
-> ActionEnvironment
-> B.ByteString
-> Maybe Int
-> (Int,[FunctionData])
-> IO [(Int, [FunctionData])]
performReduceAction optDec merge fct part reader writer env opts n (i,ls)
= do
let a = optDec opts
infoM localLogger "performReduceAction"
putTimeStamp "Begin performReduceAction"
infoM localLogger "reading inputList"
inputList <- readConnector reader env ls
infoM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ (show . length $ inputList) ++ "\n\n"
debugM localLogger $ ">>>>>>>>>>>>>>>>>> input is: " ++ show inputList ++ "\n\n"
infoM localLogger "doing merge"
mergedList <- merge env a inputList
debugM localLogger $ ">>>>>>>>>>>>>>>>>> mergedList is: " ++ show mergedList ++ "\n\n"
infoM localLogger "doing reduce"
maybesList <- mapM (\(k2,v2s) -> performReduceFunction a k2 v2s) mergedList
let tupleList = catMaybes maybesList
debugM localLogger $ ">>>>>>>>>>>>>>>>>> tupleList is: " ++ show tupleList ++ "\n\n"
infoM localLogger "doing partition"
partedList <- case n of
(Just n') -> part env a n' tupleList
(Nothing) -> return [(i,tupleList)]
debugM localLogger $ ">>>>>>>>>>>>>>>>>> partedList is: " ++ show partedList ++ "\n\n"
infoM localLogger "writing outputlist: begin"
outputList <- writeConnector writer env partedList
infoM localLogger "writing outputlist: done"
putTimeStamp "End performReduceAction"
return outputList
where
performReduceFunction a k2 v2s
= do
mbV3 <- fct env a k2 v2s
case mbV3 of
(Nothing) -> return Nothing
(Just v3) -> return $ Just (k2,v3)
putTimeStamp :: String -> IO ()
putTimeStamp s = do
t1 <- getPOSIXTime
infoM localLogger (s++" : "++ show t1)