module Holumbus.MapReduce.Types
(
FunctionData(..)
, TaskId
, TaskType(..)
, TaskState(..)
, getNextTaskState
, TaskOutputType(..)
, TaskData(..)
, JobId
, JobState(..)
, getNextJobState
, getPrevJobState
, fromJobStatetoTaskType
, OutputMap
, JobAction(..)
, JobInfo(..)
, JobData(..)
, JobResultContainer(..)
, JobResult(..)
, ActionName
, ActionInfo
, ActionEnvironment(..)
, mkActionEnvironment
, InputReader
, OutputWriter
, OptionsDecoder
, ActionConfiguration(..)
, SplitConfiguration(..)
, MapConfiguration(..)
, ReduceConfiguration(..)
, defaultActionConfiguration
, defaultSplitConfiguration
, defaultMapConfiguration
, defaultReduceConfiguration
, readActionConfiguration
, createJobInfoFromConfiguration
, createListsFromJobResult
, ActionData(..)
, getActionForTaskType
, ActionMap
, MapAction
, MapFunction
, MapPartition
, ReduceAction
, ReduceMerge
, ReduceFunction
, ReducePartition
)
where
import Control.Concurrent
import Data.Binary
import qualified Data.ByteString.Lazy as B
import qualified Data.Map as Map
import qualified Data.HashTable as Hash
import Data.Maybe
import Data.Time
import System.Log.Logger
import Text.XML.HXT.Arrow
import qualified Holumbus.Data.AccuMap as AMap
import Holumbus.Common.FileHandling
import Holumbus.Common.Utils
import qualified Holumbus.Data.AccuMap as AMap
import qualified Holumbus.Data.KeyMap as KMap
import qualified Holumbus.FileSystem.FileSystem as FS
localLogger :: String
localLogger = "Holumbus.MapReduce.Types"
data FunctionData
= TupleFunctionData B.ByteString
| FileFunctionData FS.FileId
deriving (Show, Eq, Ord)
instance Binary FunctionData where
put (TupleFunctionData t) = putWord8 0 >> put t
put (FileFunctionData f) = putWord8 1 >> put f
get
= do
t <- getWord8
case t of
1 -> get >>= \f -> return (FileFunctionData f)
_ -> get >>= \b -> return (TupleFunctionData b)
instance XmlPickler FunctionData where
xpickle = xpFunctionData
xpFunctionData :: PU FunctionData
xpFunctionData
= xpElem "data" $
xpAlt tag ps
where
tag (TupleFunctionData _) = 0
tag (FileFunctionData _) = 1
ps = [xpWrap (\p -> TupleFunctionData (encodeTuple p), \(TupleFunctionData p) -> decodeTuple p) xpRawFunctionData
,xpWrap (\f -> FileFunctionData f, \(FileFunctionData f) -> f) xpFileFunctionData ]
xpRawFunctionData = xpPair (xpElem "key" xpText0) (xpElem "value" xpText0)
xpFileFunctionData = xpElem "filename" xpText
encodeTuple :: (Binary k, Binary v) => (k, v) -> B.ByteString
encodeTuple t = encode t
decodeTuple :: (Binary k, Binary v) => B.ByteString -> (k, v)
decodeTuple t = decode t
type TaskId = Integer
data TaskType = TTSplit | TTMap | TTCombine | TTReduce | TTError
deriving (Show, Eq, Ord)
instance Binary TaskType where
put (TTSplit) = putWord8 1
put (TTMap) = putWord8 2
put (TTCombine) = putWord8 3
put (TTReduce) = putWord8 4
put (TTError) = putWord8 0
get
= do
t <- getWord8
case t of
1 -> return (TTSplit)
2 -> return (TTMap)
3 -> return (TTCombine)
4 -> return (TTReduce)
_ -> return (TTError)
data TaskState = TSIdle | TSSending | TSInProgress | TSCompleted | TSFinished | TSError
deriving (Show, Eq, Ord, Enum)
instance Binary TaskState where
put (TSIdle) = putWord8 1
put (TSSending) = putWord8 2
put (TSInProgress) = putWord8 3
put (TSCompleted) = putWord8 4
put (TSFinished) = putWord8 5
put (TSError) = putWord8 0
get
= do
t <- getWord8
case t of
1 -> return (TSIdle)
2 -> return (TSSending)
3 -> return (TSInProgress)
4 -> return (TSCompleted)
5 -> return (TSFinished)
_ -> return (TSError)
getNextTaskState :: TaskState -> TaskState
getNextTaskState TSError = TSError
getNextTaskState TSFinished = TSFinished
getNextTaskState s = succ s
data TaskOutputType = TOTRawTuple | TOTFile
deriving (Show, Read, Eq, Ord, Enum)
instance Binary TaskOutputType where
put (TOTRawTuple) = putWord8 0
put (TOTFile) = putWord8 1
get
= do
t <- getWord8
case t of
0 -> return (TOTRawTuple)
_ -> return (TOTFile)
instance XmlPickler TaskOutputType where
xpickle = xpAttr "output" $ xpPrim
data TaskData = TaskData {
td_JobId :: ! JobId
, td_TaskId :: ! TaskId
, td_Type :: ! TaskType
, td_State :: ! TaskState
, td_Option :: ! B.ByteString
, td_PartValue :: ! (Maybe Int)
, td_Input :: ! (Int, [FunctionData])
, td_Output :: ! [(Int,[FunctionData])]
, td_OutputType :: ! TaskOutputType
, td_Action :: ! ActionName
} deriving (Show, Eq, Ord)
instance Binary TaskData where
put (TaskData jid tid tt ts opt pv i o ot a)
= put jid >> put tid >> put tt >> put ts >> put opt >> put pv >> put i >> put o >> put ot >> put a
get
= do
jid <- get
tid <- get
tt <- get
ts <- get
opt <- get
pv <- get
i <- get
o <- get
ot <- get
a <- get
return (TaskData jid tid tt ts opt pv i o ot a)
type JobId = Integer
data JobState = JSPlanned | JSIdle | JSSplit | JSMap | JSCombine | JSReduce | JSCompleted | JSFinished | JSError
deriving(Show, Eq, Ord, Enum)
instance Binary JobState where
put (JSPlanned) = putWord8 1
put (JSIdle) = putWord8 2
put (JSSplit) = putWord8 3
put (JSMap) = putWord8 4
put (JSCombine) = putWord8 5
put (JSReduce) = putWord8 6
put (JSCompleted) = putWord8 7
put (JSFinished) = putWord8 8
put (JSError) = putWord8 0
get
= do
t <- getWord8
case t of
1 -> return (JSPlanned)
2 -> return (JSIdle)
3 -> return (JSSplit)
4 -> return (JSMap)
5 -> return (JSCombine)
6 -> return (JSReduce)
7 -> return (JSCompleted)
8 -> return (JSFinished)
_ -> return (JSError)
getNextJobState :: JobState -> JobState
getNextJobState JSError = JSError
getNextJobState JSFinished = JSFinished
getNextJobState s = succ s
getPrevJobState :: JobState -> JobState
getPrevJobState JSIdle = JSIdle
getPrevJobState JSError = JSError
getPrevJobState s = pred s
fromJobStatetoTaskType :: JobState -> Maybe TaskType
fromJobStatetoTaskType JSSplit = Just TTSplit
fromJobStatetoTaskType JSMap = Just TTMap
fromJobStatetoTaskType JSCombine = Just TTCombine
fromJobStatetoTaskType JSReduce = Just TTReduce
fromJobStatetoTaskType _ = Nothing
type OutputMap = Map.Map JobState (AMap.AccuMap Int FunctionData)
data JobAction = JobAction {
ja_Name :: ! ActionName
, ja_Output :: ! TaskOutputType
, ja_Count :: ! Int
} deriving (Show, Eq)
instance Binary JobAction where
put (JobAction n o c) = put n >> put o >> put c
get
= do
n <- get
o <- get
c <- get
return (JobAction n o c)
instance XmlPickler JobAction where
xpickle = xpJobAction
xpJobAction :: PU JobAction
xpJobAction =
xpWrap
(\(n, o, c) -> JobAction n o c,
\(JobAction n o c) -> (n, o, c)) xpAction
where
xpAction = xpTriple (xpFunction) (xpickle) (xpCount)
xpFunction = xpAttr "name" xpText
xpCount = xpWrap ((maybe 1 id), Just) $ xpOption $ xpAttr "count" xpickle
data JobInfo = JobInfo {
ji_Description :: ! String
, ji_Option :: ! B.ByteString
, ji_SplitAction :: ! (Maybe JobAction)
, ji_MapAction :: ! (Maybe JobAction)
, ji_CombineAction :: ! (Maybe JobAction)
, ji_ReduceAction :: ! (Maybe JobAction)
, ji_NumOfResults :: ! (Maybe Int)
, ji_Input :: ! [FunctionData]
} deriving (Show, Eq)
instance Binary JobInfo where
put (JobInfo d opt sa ma ca ra nor i)
= put d >> put opt >>
put sa >> put ma >> put ca >> put ra >> put nor >> put i
get
= do
d <- get
opt <- get
sa <- get
ma <- get
ca <- get
ra <- get
nor <- get
i <- get
return (JobInfo d opt sa ma ca ra nor i)
instance XmlPickler JobInfo where
xpickle = xpJobInfo
xpJobInfo :: PU JobInfo
xpJobInfo =
xpElem "jobInfo" $
xpWrap
(\(n, (sa,ma,ca,ra), nor, i) -> JobInfo n (encode ()) sa ma ca ra nor i,
\(JobInfo n _ sa ma ca ra nor i) -> (n, (sa,ma,ca,ra), nor, i)) xpJob
where
xpJob =
xp4Tuple
(xpAttr "name" xpText0)
(xpActions)
(xpCount)
(xpFunctionDataList)
xpActions = xp4Tuple (xpSplitAction) (xpMapAction) (xpCombineAction) (xpReduceAction)
xpSplitAction = xpOption $ xpElem "split" $ xpJobAction
xpMapAction = xpOption $ xpElem "map" $ xpJobAction
xpCombineAction = xpOption $ xpElem "combine" $ xpJobAction
xpReduceAction = xpOption $ xpElem "reduce" $ xpJobAction
xpFunctionDataList
= xpWrap (filterEmptyList,setEmptyList) $ xpOption $
xpElem "inputList" $ xpList xpFunctionData
xpCount = xpOption $ xpAttr "numOfResults" xpickle
data JobData = JobData {
jd_JobId :: JobId
, jd_State :: JobState
, jd_OutputMap :: OutputMap
, jd_Info :: JobInfo
, jd_startTime :: UTCTime
, jd_endTime :: UTCTime
, jd_Result :: JobResultContainer
}
instance Show JobData where
show (JobData jid state om _ t1 t2 _) =
"JobId:\t" ++ show jid ++ "\n" ++
"State:\t" ++ show state ++ "\n" ++
"Info:\tJobInfo\n" ++
"StartTime:\t" ++ show t1 ++ "\n" ++
"EndTime:\t" ++ show t2 ++ "\n" ++
"OutputMap:\n" ++ show om ++ "\n"
data JobResultContainer = JobResultContainer (MVar JobResult)
instance Show JobResultContainer where
show _ = "JobResultContainer"
data JobResult = JobResult {
jr_Output :: [FunctionData]
} deriving (Show)
instance Binary JobResult where
put (JobResult o) = put o
get
= do
o <- get
return (JobResult o)
data ActionEnvironment = ActionEnvironment {
ae_TaskData :: TaskData
, ae_FileSystem :: FS.FileSystem
}
mkActionEnvironment :: TaskData -> FS.FileSystem -> ActionEnvironment
mkActionEnvironment td fs = ActionEnvironment td fs
type InputReader k1 v1 = B.ByteString -> IO [(k1,v1)]
type OutputWriter k2 v2 = [(k2,v2)] -> IO B.ByteString
readConnector
:: (Binary k1, Binary v1)
=> InputReader k1 v1
-> ActionEnvironment
-> [FunctionData]
-> IO [(k1,v1)]
readConnector ic ae ls
= do
debugM localLogger $ "readConnector: " ++ show ls
os <- mapM (readInput (ae_FileSystem ae)) ls
return $ concat $ catMaybes os
where
readInput _ (TupleFunctionData t) = return $ Just $ [decode t]
readInput fs (FileFunctionData f)
= do
debugM localLogger $ "loadInputList: getting content for: " ++ f
mbc <- FS.getFileContent f fs
debugM localLogger $ "loadInputList: content is: " ++ show mbc
if isNothing mbc
then do
debugM localLogger $ "loadInputList: no content found"
return Nothing
else do
d <- ic $ fromJust mbc
return $ Just d
writeConnector
:: (Binary k2, Binary v2)
=> OutputWriter k2 v2
-> ActionEnvironment
-> [(Int,[(k2,v2)])]
-> IO [(Int,[FunctionData])]
writeConnector oc ae ls
= do
debugM localLogger $ "writeConnector: "
os <- mapM (writeOutput (ae_FileSystem ae) tot) ls
return $ catMaybes os
where
td = ae_TaskData ae
tot = td_OutputType $ ae_TaskData ae
writeOutput _ TOTRawTuple (i,ts)
= return $ Just $ (i,bs)
where
bs = map (\t -> TupleFunctionData $ encode t) ts
writeOutput fs _ (i,ts)
= do
c <- oc ts
FS.appendFile fn c fs
return $ Just (i,[FileFunctionData fn])
where
fn = "j" ++ show (td_JobId td) ++ "_t" ++ show (td_TaskId td) ++ "_i" ++ show i
defaultInputReader :: (Binary k1, Binary v1) => InputReader k1 v1
defaultInputReader b
= return $ parseByteStringToList b
defaultOutputWriter :: (Binary k2, Binary v2) => OutputWriter k2 v2
defaultOutputWriter ls
= return $ B.concat $ map encode ls
type ActionName = String
type ActionInfo = String
data ActionConfiguration a k1 v1 k2 v2 v3 v4
= ActionConfiguration {
ac_Name :: ActionName
, ac_Info :: ActionInfo
, ac_OptEncoder :: OptionsEncoder a
, ac_OptDecoder :: OptionsDecoder a
, ac_InputEncoder :: InputEncoder k1 v1
, ac_OutputDecoder :: OutputDecoder k2 v4
, ac_Split :: Maybe (SplitConfiguration a k1 v1)
, ac_Map :: Maybe (MapConfiguration a k1 v1 k2 v2)
, ac_Combine :: Maybe (ReduceConfiguration a k2 v2 v3)
, ac_Reduce :: Maybe (ReduceConfiguration a k2 v3 v4)
}
data SplitConfiguration a k1 v1
= SplitConfiguration {
sc_Function :: SplitFunction a k1 v1
, sc_Reader :: InputReader k1 v1
, sc_Writer :: OutputWriter k1 v1
}
data MapConfiguration a k1 v1 k2 v2
= MapConfiguration {
mc_Function :: MapFunction a k1 v1 k2 v2
, mc_Partition :: MapPartition a k2 v2
, mc_Reader :: InputReader k1 v1
, mc_Writer :: OutputWriter k2 v2
}
data ReduceConfiguration a k2 v2 v3
= ReduceConfiguration {
rc_Merge :: ReduceMerge a k2 v2
, rc_Function :: ReduceFunction a k2 v2 v3
, rc_Partition :: ReducePartition a k2 v3
, rc_Reader :: InputReader k2 v2
, rc_Writer :: OutputWriter k2 v3
}
defaultActionConfiguration
:: (Binary a, Binary k1, Binary v1, Binary k2, Binary v4)
=> ActionName
-> ActionConfiguration a k1 v1 k2 v2 v3 v4
defaultActionConfiguration name
= ActionConfiguration
name
""
defaultOptionsEncoder
defaultOptionsDecoder
defaultInputEncoder
defaultOutputDecoder
(Just defaultSplitConfiguration)
Nothing
Nothing
Nothing
defaultSplitConfiguration
:: (Binary a, Binary k1, Binary v1)
=> SplitConfiguration a k1 v1
defaultSplitConfiguration
= SplitConfiguration
defaultSplit
defaultInputReader
defaultOutputWriter
defaultMapConfiguration
:: (Ord k2, Binary a, Binary k1, Binary v1, Binary k2, Binary v2)
=> MapFunction a k1 v1 k2 v2
-> MapConfiguration a k1 v1 k2 v2
defaultMapConfiguration fct
= MapConfiguration
fct
defaultPartition
defaultInputReader
defaultOutputWriter
defaultReduceConfiguration
:: (Ord k2, Binary a, Binary k2, Binary v2, Binary v3)
=> ReduceFunction a k2 v2 v3
-> ReduceConfiguration a k2 v2 v3
defaultReduceConfiguration fct
= ReduceConfiguration
defaultMerge
fct
defaultPartition
defaultInputReader
defaultOutputWriter
defaultSplit
:: SplitFunction a k1 v1
defaultSplit _ _ n ls
= return partedList
where
partedList = AMap.toList $ AMap.fromList ps
ns = [(x `mod` n) + 1 | x <- [0..]]
is = map (\a -> [a]) ls
ps = zip ns is
defaultPartition
:: (Binary k2, Binary v2)
=> MapPartition a k2 v2
defaultPartition _ _ 1 ls
= return [(1,ls)]
defaultPartition _ _ n ls
= do
let markedList = map (\t@(k,_) -> (hash k,t)) ls
let resultList = AMap.toList $ AMap.fromTupleList markedList
return resultList
where
hash k = ((fromIntegral $ Hash.hashString (show $ encode k)) `mod` n) + 1
defaultMerge
:: (Ord k2, Binary k2, Binary v2)
=> ReduceMerge a k2 v2
defaultMerge _ _ ls
= return $ AMap.toList $ AMap.fromTupleList ls
data ActionData
= ActionData {
ad_Name :: ActionName
, ad_Info :: ActionInfo
, ad_Split :: Maybe BinarySplitAction
, ad_Map :: Maybe BinaryMapAction
, ad_Combine :: Maybe BinaryReduceAction
, ad_Reduce :: Maybe BinaryReduceAction
}
instance KMap.Key ActionData where
getKey = ad_Name
instance Show ActionData where
show (ActionData n i _ _ _ _) = "{ActionData name:\"" ++ n ++ "\" info:\"" ++ i ++ "\"}"
type ActionMap = KMap.KeyMap ActionData
type OptionsEncoder a = a -> B.ByteString
type OptionsDecoder a = B.ByteString -> a
defaultOptionsEncoder :: (Binary a) => OptionsEncoder a
defaultOptionsEncoder = encode
defaultOptionsDecoder :: (Binary a) => OptionsDecoder a
defaultOptionsDecoder = decode
type InputEncoder k1 v1 = [(k1,v1)] -> [FS.FileId] -> [FunctionData]
type OutputDecoder k2 v4 = [FunctionData] -> ([(k2,v4)],[FS.FileId])
defaultInputEncoder :: (Binary k1, Binary v1) => InputEncoder k1 v1
defaultInputEncoder ls1 ls2 = ls1' ++ ls2'
where
ls1' = map (\t -> TupleFunctionData (encode t)) ls1
ls2' = map (\f -> FileFunctionData f) ls2
defaultOutputDecoder :: (Binary k2, Binary v4) => OutputDecoder k2 v4
defaultOutputDecoder ls = (ls1, ls2)
where
ls1 = map (\t -> decode t) $ catMaybes ls1'
ls2 = catMaybes ls2'
(ls1',ls2') = unzip $ map (splitter) ls
splitter (TupleFunctionData t) = (Just t, Nothing)
splitter (FileFunctionData f) = (Nothing, Just f)
getActionForTaskType :: TaskType -> ActionData -> Maybe BinaryReduceAction
getActionForTaskType TTSplit ad = ad_Split ad
getActionForTaskType TTMap ad = ad_Map ad
getActionForTaskType TTCombine ad = ad_Combine ad
getActionForTaskType TTReduce ad = ad_Reduce ad
getActionForTaskType _ _ = Nothing
readActionConfiguration
:: ( Ord k2, Binary a
, Binary k1, Binary v1
, Binary k2, Binary v2
, Binary v3, Binary v4)
=> ActionConfiguration a k1 v1 k2 v2 v3 v4
-> ActionData
readActionConfiguration (ActionConfiguration n i _ optDec _ _ sc mc cc rc)
= ActionData n i sf mf cf rf
where
sf = maybe Nothing (\(SplitConfiguration f ir ow) -> Just $ performSplitAction optDec f ir ow) sc
mf = maybe Nothing (\(MapConfiguration f p ir ow) -> Just $ performMapAction optDec f p ir ow) mc
cf = maybe Nothing (\(ReduceConfiguration m f p ir ow) -> Just $ performReduceAction optDec m f p ir ow) cc
rf = maybe Nothing (\(ReduceConfiguration m f p ir ow) -> Just $ performReduceAction optDec m f p ir ow) rc
createJobInfoFromConfiguration
:: ActionConfiguration a k1 v1 k2 v2 v3 v4
-> a
-> [(k1,v1)]
-> [FS.FileId]
-> Int
-> Int
-> Int
-> Int
-> TaskOutputType
-> JobInfo
createJobInfoFromConfiguration (ActionConfiguration n _ optEnc _ inEnc _ sConf mConf cConf rConf) opts' ls1 ls2 sCnt mCnt rCnt nor' rt
= JobInfo n opts sa ma ca ra nor i
where
opts = optEnc opts'
sa = maybe Nothing (\_ -> Just $ JobAction n TOTFile sCnt) sConf
ma = maybe Nothing (\_ -> Just $ JobAction n TOTFile mCnt) mConf
ca = maybe Nothing (\_ -> Just $ JobAction n TOTFile mCnt) cConf
ra = maybe Nothing (\_ -> Just $ JobAction n rt rCnt) rConf
nor = Just nor'
i = inEnc ls1 ls2
createListsFromJobResult
:: ActionConfiguration a k1 v1 k2 v2 v3 v4
-> JobResult
-> ([(k2,v4)],[FS.FileId])
createListsFromJobResult ac jr = (ac_OutputDecoder ac) (jr_Output jr)
type SplitAction a k1 v1 = ActionEnvironment -> a -> Int -> [(k1,v1)] -> IO [(Int, [(k1,v1)])]
type BinarySplitAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])]
type SplitFunction a k1 v1 = SplitAction a k1 v1
performSplitAction
:: (Binary a, Binary k1, Binary v1)
=> OptionsDecoder a
-> SplitFunction a k1 v1
-> InputReader k1 v1
-> OutputWriter k1 v1
-> ActionEnvironment
-> B.ByteString
-> Maybe Int
-> (Int,[FunctionData])
-> IO [(Int, [FunctionData])]
performSplitAction optDec fct reader writer env opts n (i,ls)
= do
let a = optDec opts
infoM localLogger "performSplitAction"
infoM localLogger "reading inputList"
inputList <- readConnector reader env ls
infoM localLogger "doing split"
partedList <- case n of
(Just n') -> fct env a n' inputList
(Nothing) -> return [(i,inputList)]
infoM localLogger "writing outputlist"
outputList <- writeConnector writer env partedList
return outputList
type MapAction a k1 v1 k2 v2 = ActionEnvironment -> a -> Int -> [(k1,v1)] -> IO [(Int, [(k2,v2)])]
type BinaryMapAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])]
type MapFunction a k1 v1 k2 v2 = ActionEnvironment -> a -> k1 -> v1 -> IO [(k2, v2)]
type MapPartition a k2 v2 = ActionEnvironment -> a -> Int -> [(k2,v2)] -> IO [(Int, [(k2,v2)])]
performMapAction
:: (Ord k2,
Binary a, Binary k1, Binary v1, Binary k2, Binary v2)
=> OptionsDecoder a
-> MapFunction a k1 v1 k2 v2
-> MapPartition a k2 v2
-> InputReader k1 v1
-> OutputWriter k2 v2
-> ActionEnvironment
-> B.ByteString
-> Maybe Int
-> (Int,[FunctionData])
-> IO [(Int, [FunctionData])]
performMapAction optDec fct part reader writer env opts n (i,ls)
= do
let a = optDec opts
infoM localLogger "performMapAction"
infoM localLogger "reading inputList"
inputList <- readConnector reader env ls
infoM localLogger "doing map"
mappedList <- mapM (\(k1, v1) -> fct env a k1 v1) inputList
let tupleList = concat mappedList
infoM localLogger "doing partition"
partedList <- case n of
(Just n') -> part env a n' tupleList
(Nothing) -> return [(i,tupleList)]
infoM localLogger "writing outputlist"
outputList <- writeConnector writer env partedList
return outputList
type ReduceAction a k2 v2 v3 = ActionEnvironment -> a -> Int -> [(k2,v2)] -> IO [(Int, [(k2,v3)])]
type BinaryReduceAction = ActionEnvironment -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> IO [(Int, [FunctionData])]
type ReduceMerge a k2 v2 = ActionEnvironment -> a -> [(k2,v2)] -> IO [(k2,[v2])]
type ReduceFunction a k2 v2 v3 = ActionEnvironment -> a -> k2 -> [v2] -> IO (Maybe v3)
type ReducePartition a k2 v3 = ActionEnvironment -> a -> Int -> [(k2,v3)] -> IO [(Int, [(k2,v3)])]
performReduceAction
:: (Ord k2,
Binary a, Binary k2, Binary v2, Binary v3)
=> OptionsDecoder a
-> ReduceMerge a k2 v2
-> ReduceFunction a k2 v2 v3
-> ReducePartition a k2 v3
-> InputReader k2 v2
-> OutputWriter k2 v3
-> ActionEnvironment
-> B.ByteString
-> Maybe Int
-> (Int,[FunctionData])
-> IO [(Int, [FunctionData])]
performReduceAction optDec merge fct part reader writer env opts n (i,ls)
= do
let a = optDec opts
infoM localLogger "performReduceAction"
infoM localLogger "reading inputList"
inputList <- readConnector reader env ls
infoM localLogger "doing merge"
mergedList <- merge env a inputList
infoM localLogger "doing reduce"
maybesList <- mapM (\(k2,v2s) -> performReduceFunction a k2 v2s) mergedList
let tupleList = catMaybes maybesList
infoM localLogger "doing partition"
partedList <- case n of
(Just n') -> part env a n' tupleList
(Nothing) -> return [(i,tupleList)]
infoM localLogger "writing outputlist"
outputList <- writeConnector writer env partedList
return outputList
where
performReduceFunction a k2 v2s
= do
mbV3 <- fct env a k2 v2s
case mbV3 of
(Nothing) -> return Nothing
(Just v3) -> return $ Just (k2,v3)