module Holumbus.MapReduce.TaskProcessor
(
TaskResultFunction
, TaskProcessor
, printTaskProcessor
, newTaskProcessor
, closeTaskProcessor
, setFileSystemToTaskProcessor
, setActionMap
, setTaskCompletedHook
, setTaskErrorHook
, startTaskProcessor
, stopTaskProcessor
, listTaskIds
, getActions
, getActionNames
, startTask
, stopTask
, stopAllTasks
)
where
import Prelude hiding ( catch )
import Control.Exception ( Exception
, throw
, throwTo
, catch
)
import Control.Concurrent
import qualified Data.Map as Map
import qualified Data.Set as Set
import Data.Maybe
import Data.Typeable
import System.Log.Logger
import Holumbus.Common.Utils ( handleAll )
import qualified Holumbus.Data.KeyMap as KMap
import Holumbus.MapReduce.Types
import qualified Holumbus.FileSystem.FileSystem as FS
localLogger :: String
localLogger = "Holumbus.MapReduce.TaskProcessor"
taskLogger :: String
taskLogger = "Holumbus.MapReduce.TaskProcessor.task"
type TaskResultFunction = TaskData -> IO Bool
dummyTaskResultFunction :: TaskData -> IO Bool
dummyTaskResultFunction _ = return True
data TaskProcessorFunctions
= TaskProcessorFunctions { tpf_TaskCompleted :: TaskResultFunction
, tpf_TaskError :: TaskResultFunction
}
instance Show TaskProcessorFunctions where
show _ = "{TaskProcessorFunctions}"
data TaskProcessorException
= KillServerException
deriving (Show, Typeable)
instance Exception TaskProcessorException where
data TaskException
= KillTaskException
| UnkownTaskException ActionName
deriving (Show, Typeable)
instance Exception TaskException where
data TaskProcessorData = TaskProcessorData {
tpd_ServerThreadId :: Maybe ThreadId
, tpd_ServerDelay :: Int
, tpd_FileSystem :: FS.FileSystem
, tpd_MaxTasks :: Int
, tpd_Functions :: TaskProcessorFunctions
, tpd_ActionMap :: ActionMap
, tpd_TaskQueue :: [TaskData]
, tpd_CompletedTasks :: Set.Set TaskData
, tpd_ErrorTasks :: Set.Set TaskData
, tpd_TaskIdThreadMap :: Map.Map TaskId ThreadId
} deriving (Show)
type TaskProcessor = MVar TaskProcessorData
printTaskProcessor :: TaskProcessor -> IO String
printTaskProcessor tp
= withMVar tp $ \tpd -> return $ show tpd
defaultTaskProcessorData :: TaskProcessorData
defaultTaskProcessorData = tpd
where
funs = TaskProcessorFunctions
dummyTaskResultFunction
dummyTaskResultFunction
tpd = TaskProcessorData
Nothing
1000
undefined
1
funs
KMap.empty
[]
Set.empty
Set.empty
Map.empty
newTaskProcessor :: IO TaskProcessor
newTaskProcessor
= do
let tpd = defaultTaskProcessorData
tp <- newMVar tpd
return tp
closeTaskProcessor :: TaskProcessor -> IO ()
closeTaskProcessor tp
= do
stopTaskProcessor tp
setFileSystemToTaskProcessor :: FS.FileSystem -> TaskProcessor -> IO ()
setFileSystemToTaskProcessor fs tp
= modifyMVar tp $
\tpd -> return $ (tpd { tpd_FileSystem = fs }, ())
setActionMap :: KMap.KeyMap ActionData -> TaskProcessor -> IO ()
setActionMap m tp
= modifyMVar tp $
\tpd -> return $ (tpd { tpd_ActionMap = m }, ())
setTaskCompletedHook :: TaskResultFunction -> TaskProcessor -> IO ()
setTaskCompletedHook f tp
= modifyMVar tp $
\tpd ->
do
let funs = tpd_Functions tpd
let funs' = funs { tpf_TaskCompleted = f }
return (tpd { tpd_Functions = funs' }, ())
setTaskErrorHook :: TaskResultFunction -> TaskProcessor -> IO ()
setTaskErrorHook f tp
= modifyMVar tp $
\tpd ->
do
let funs = tpd_Functions tpd
let funs' = funs { tpf_TaskError = f }
return (tpd { tpd_Functions = funs' }, ())
startTaskProcessor :: TaskProcessor -> IO ()
startTaskProcessor tp
= do
modifyMVar tp $
\tpd ->
do
thd <- case (tpd_ServerThreadId tpd) of
(Just i) -> return i
(Nothing) ->
do
i <- forkIO $ doProcessing tp
return i
return (tpd {tpd_ServerThreadId = (Just thd)}, ())
stopTaskProcessor :: TaskProcessor -> IO ()
stopTaskProcessor tp
= do
modifyMVar tp $
\tpd ->
do
case (tpd_ServerThreadId tpd) of
(Nothing) -> return ()
(Just i) ->
do
throwTo i KillServerException
yield
return ()
return (tpd {tpd_ServerThreadId = Nothing}, ())
containsTask :: TaskId -> TaskProcessorData -> Bool
containsTask tid tpd = isTaskRunning || isTaskQueued
where
isTaskRunning = Map.member tid (tpd_TaskIdThreadMap tpd)
isTaskQueued = any (\td -> (td_TaskId td) == tid) (tpd_TaskQueue tpd)
queueTask :: TaskData -> TaskProcessorData -> TaskProcessorData
queueTask td tpd = tpd { tpd_TaskQueue = q' }
where
q = tpd_TaskQueue tpd
q' = q ++ [td]
dequeueTask :: TaskId -> TaskProcessorData -> TaskProcessorData
dequeueTask tid tpd = tpd { tpd_TaskQueue = q' }
where
q = tpd_TaskQueue tpd
q' = filter (\td -> (td_TaskId td) /= tid) q
getTaskThreadId :: TaskId -> TaskProcessorData -> Maybe ThreadId
getTaskThreadId tid tpd = Map.lookup tid (tpd_TaskIdThreadMap tpd)
getTasksIds :: TaskProcessorData -> [TaskId]
getTasksIds tpd = Set.toList $ Set.union (Set.fromList qs) (Set.fromList ts)
where
qs = map (\td -> td_TaskId td) (tpd_TaskQueue tpd)
ts = Map.keys (tpd_TaskIdThreadMap tpd)
addTask :: TaskData -> TaskProcessorData -> TaskProcessorData
addTask td tpd = if containsTask tid tpd then tpd else queueTask td tpd
where
tid = td_TaskId td
deleteTask :: TaskId -> TaskProcessorData -> TaskProcessorData
deleteTask tid tpd = dequeueTask tid tpd'
where
tpd' = tpd { tpd_TaskIdThreadMap = ttm' }
ttm = tpd_TaskIdThreadMap tpd
ttm' = Map.delete tid ttm
listTaskIds :: TaskProcessor -> IO [TaskId]
listTaskIds tp
= withMVar tp $
\tpd -> return $ getTasksIds tpd
getActions :: TaskProcessor -> IO [ActionData]
getActions tp
= withMVar tp $
\tpd -> return $ KMap.elems (tpd_ActionMap tpd)
getActionNames :: TaskProcessor -> IO [ActionName]
getActionNames tp
= do
actions <- getActions tp
return $ map (\a -> ad_Name a) actions
startTask :: TaskData -> TaskProcessor -> IO ()
startTask td tp
= do
debugM localLogger $ "waiting to add Task " ++ show (td_TaskId td)
modifyMVar tp $
\tpd->
do
debugM localLogger $ "adding Task " ++ show (td_TaskId td)
return (addTask td tpd, ())
stopTask :: TaskId -> TaskProcessor -> IO ()
stopTask tid tp
= do
debugM localLogger $ "waiting to stop Task " ++ show tid
mthd <- modifyMVar tp $
\tpd->
do
let thd = getTaskThreadId tid tpd
return (deleteTask tid tpd, thd)
debugM localLogger $ "stopping Task " ++ show tid
maybe (return ()) (\thd -> throwTo thd KillTaskException) mthd
stopAllTasks :: TaskProcessor -> IO ()
stopAllTasks tp
= do
debugM localLogger $ "waiting to stop all Tasks"
tids <- withMVar tp $ \tpd -> return $ getTasksIds tpd
mapM (\tid -> stopTask tid tp) tids
return ()
setTaskCompleted :: TaskData -> TaskProcessorData -> TaskProcessorData
setTaskCompleted td tpd = tpd { tpd_TaskIdThreadMap = ttm', tpd_CompletedTasks = ct' }
where
tid = td_TaskId td
ttm' = Map.delete tid (tpd_TaskIdThreadMap tpd)
ct' = Set.insert td (tpd_CompletedTasks tpd)
setTaskError :: TaskData -> TaskProcessorData -> TaskProcessorData
setTaskError td tpd = tpd { tpd_TaskIdThreadMap = ttm', tpd_ErrorTasks = et' }
where
tid = td_TaskId td
ttm' = Map.delete tid (tpd_TaskIdThreadMap tpd)
et' = Set.insert td (tpd_ErrorTasks tpd)
reportErrorTask :: TaskData -> TaskProcessor -> IO ()
reportErrorTask td tp
= modifyMVar tp $
\tpd ->
do
let tpd' = setTaskError td tpd
return (tpd', ())
reportCompletedTask :: TaskData -> TaskProcessor -> IO ()
reportCompletedTask td tp
= modifyMVar tp $
\tpd ->
do
let tpd' = setTaskCompleted td tpd
return (tpd', ())
setTaskRunning :: TaskId -> ThreadId -> TaskProcessorData -> TaskProcessorData
setTaskRunning tid thd tpd = tpd { tpd_TaskIdThreadMap = ttm' }
where
ttm = tpd_TaskIdThreadMap tpd
ttm' = Map.insert tid thd ttm
getNextQueuedTask :: TaskProcessorData -> (Maybe TaskData, TaskProcessorData)
getNextQueuedTask tpd = (td , tpd { tpd_TaskQueue = q' })
where
q = tpd_TaskQueue tpd
q' = if null q then q else tail q
td = if null q then Nothing else Just $ head q
doProcessing :: TaskProcessor -> IO ()
doProcessing tp
= do
catch (doProcessing' tp)
handler
where
handler :: TaskProcessorException -> IO ()
handler err = putStrLn (show err)
doProcessing' tp'
= do
handleNewTasks tp'
handleFinishedTasks tp'
delay <- withMVar tp' (\tpd -> return $ tpd_ServerDelay tpd)
threadDelay delay
doProcessing' tp'
handleNewTasks :: TaskProcessor -> IO ()
handleNewTasks tp
= do
modifyMVar tp $
\tpd ->
do
tpd' <- handleNewTasks' tpd
return (tpd',())
where
handleNewTasks' tpd
= do
let maxTasks = tpd_MaxTasks tpd
let runningTasks = Map.size (tpd_TaskIdThreadMap tpd)
let moreTasks = not $ null (tpd_TaskQueue tpd)
if (moreTasks && (runningTasks < maxTasks))
then do
let (mtd, tpd') = getNextQueuedTask tpd
let td = fromJust mtd
thd <- runTask td tp
let tpd'' = setTaskRunning (td_TaskId td) thd tpd'
handleNewTasks' tpd''
else do
return tpd
runTask :: TaskData -> TaskProcessor -> IO ThreadId
runTask td tp
= do
forkIO $
handleAll ( \ e -> do
errorM taskLogger $ "Exception: " ++ show e
reportErrorTask td tp
) $
do yield
td' <- performTask td tp
reportCompletedTask td' tp
handleFinishedTasks :: TaskProcessor -> IO ()
handleFinishedTasks tp
= do
modifyMVar tp $
\tpd ->
do
cts' <- sendTasksResults (tpd_CompletedTasks tpd) (tpf_TaskCompleted $ tpd_Functions tpd)
ets' <- sendTasksResults (tpd_ErrorTasks tpd) (tpf_TaskError $ tpd_Functions tpd)
let tpd' = tpd { tpd_CompletedTasks = cts', tpd_ErrorTasks = ets' }
return (tpd', ())
sendTasksResults :: Set.Set TaskData -> TaskResultFunction -> IO (Set.Set TaskData)
sendTasksResults set fun
= do
let ls = Set.toList set
sendResults <- mapM fun ls
let (failures,_) = unzip $ filter (\(_,b) -> not b) $ zip ls sendResults
return $ Set.fromList failures
performTask :: TaskData -> TaskProcessor-> IO TaskData
performTask td tp
= do
infoM taskLogger $ "Task " ++ show (td_TaskId td)
debugM taskLogger $ "input td: " ++ show td
(ad, parts, bin, fs, opt) <- withMVar tp $
\tpd ->
do
let action = KMap.lookup (td_Action td) (tpd_ActionMap tpd)
let parts = (td_PartValue td)
let input = (td_Input td)
let option = (td_Option td)
let filesystem = (tpd_FileSystem tpd)
return (action, parts, input, filesystem, option)
case ad of
(Nothing) ->
return td
(Just a) ->
do
action <- case (getActionForTaskType (td_Type td) a) of
(Just a') -> return a'
(Nothing) -> throw (UnkownTaskException $ td_Action td)
let env = mkActionEnvironment td fs
bout <- action env opt parts bin
let td' = td { td_Output = bout }
debugM taskLogger $ "output td: " ++ show td'
return td'