-- ----------------------------------------------------------------------------
{- |
  Module     : Holumbus.MapReduce.JobController
  Copyright  : Copyright (C) 2008 Stefan Schmidt
  License    : MIT

  Maintainer : Stefan Schmidt (stefanschmidt@web.de)
  Stability  : experimental
  Portability: portable
  Version    : 0.1


-}
-- ----------------------------------------------------------------------------

{-# OPTIONS -fglasgow-exts #-}
module Holumbus.MapReduce.JobController
    ( TaskSendResult(..)
    , TaskSendFunction

    , JobController

    , printJobController

      -- * Creation and Destruction
    , newJobController
    , closeJobController
    , setFileSystemToJobController
    , setTaskSendHook

      -- * Job Controller 
    , startJobController
    , stopJobController
    , isJobControllerRunning
    , singleStepJobControlling

      -- * performing MapReduce-Jobs
      -- , startJob
      -- , stopJob
    , performJob

      -- * handling Task-Responses
    , setTaskCompleted
    , setTaskError
    )
where

import           Prelude hiding                 ( catch )

import           Control.Exception              ( Exception
                                                , throwTo
                                                , catch
                                                )

import           Control.Concurrent
import           Control.Monad

import qualified Data.ByteString.Lazy           as B
import           Data.Maybe
import           Data.Time
import           Data.Typeable
import qualified Data.Map                       as Map
import qualified Data.Set                       as Set
import           System.Log.Logger

import           Holumbus.MapReduce.Types
import qualified Holumbus.FileSystem.FileSystem as FS
import qualified Holumbus.Data.AccuMap          as AMap
import qualified Holumbus.Data.MultiMap         as MMap


localLogger :: String
localLogger = "Holumbus.MapReduce.JobController"

cycleLogger :: String
cycleLogger = "Holumbus.MapReduce.JobController.cycle"

-- ----------------------------------------------------------------------------
--
-- ----------------------------------------------------------------------------


newJobData  
  :: JobControllerData -> JobInfo
  -> IO (JobControllerData, JobData, MVar JobResult)
newJobData jcd info
  = do
    t <- getCurrentTime
    mVar <- newEmptyMVar
    let state = JSIdle
        parts = maybe 1 id $ getCurrentTaskPartValue state info
        jrc   = JobResultContainer mVar 
        jid   = jcd_NextJobId jcd
        jcd'  = jcd { jcd_NextJobId = (jid+1) }
        pairs = initialSplit parts (ji_Input info) 
        -- let pairs = zip (iterate (+1) 1) (map (\i -> [i]) (ji_Input info))
        accuMap = AMap.fromList pairs 
        outputMap = Map.insert JSIdle accuMap Map.empty
    return (jcd', JobData jid state outputMap info t t jrc, mVar)
    where
    initialSplit n ls = ps
      where
      ns = [(x `mod` n) + 1 | x <- [1..]]
      is = map (\a -> [a]) ls 
      ps = zip ns is


newTaskData 
  :: JobControllerData 
  -> JobId -> TaskType -> TaskState -> B.ByteString -> Maybe Int -> (Int,[FunctionData]) -> ActionName -> TaskOutputType 
  -> IO (JobControllerData, TaskData)
newTaskData jcd jid tt ts opt n i a ot
  = do
    let tid = jcd_NextTaskId jcd
    let jcd' = jcd { jcd_NextTaskId = (tid+1) }
    return (jcd', TaskData jid tid tt ts opt n i [] ot a) 
    
data TaskSendResult = TSRSend | TSRNotSend | TSRError
  deriving (Show, Eq, Ord, Enum)    
    
-- ----------------------------------------------------------------------------
--
-- ----------------------------------------------------------------------------

intersections :: Ord a => [Set.Set a] -> Set.Set a  
intersections [] = Set.empty
intersections (s:ss) = foldl Set.intersection s ss 


mapAccumLM
  :: (Monad m) 
  => (acc -> x -> m (acc, y)) -- Function of of input list
                              -- and accumulator, returning new
                              -- accumulator and of result list
  -> acc                      -- Initial accumulator 
  -> [x]                      -- Input list
  -> m (acc, [y])             -- Final accumulator and result list
mapAccumLM _ s [] = return (s, [])
mapAccumLM f s (x:xs) 
  = do
    (s', y ) <- f s x
    (s'',ys) <- mapAccumLM f s' xs
    return (s'',y:ys)




type TaskSendFunction = TaskData -> IO (TaskSendResult)

dummyTaskSendFunction :: TaskData -> IO (TaskSendResult)
dummyTaskSendFunction _ = return TSRSend


type JobMap         = Map.Map JobId JobData
type TaskMap        = Map.Map TaskId TaskData
    
type StateJobIdMap  = MMap.MultiMap JobState JobId

type JobIdTaskIdMap = MMap.MultiMap JobId TaskId
type TypeTaskIdMap  = MMap.MultiMap TaskType TaskId
type StateTaskIdMap = MMap.MultiMap TaskState TaskId
   
    
data JobControlFunctions = JobControlFunctions {
    jcf_TaskSend :: TaskSendFunction
  }
instance Show JobControlFunctions where
  show _ = "JobControlFunctions"

    
data JobControllerData = JobControllerData {
  -- internal
    jcd_ServerThreadId :: Maybe ThreadId
  , jcd_ServerDelay    :: Int
  , jcd_FileSystem     :: Maybe FS.FileSystem
  -- control
  , jcd_NextJobId      :: JobId
  , jcd_NextTaskId     :: TaskId
  , jcd_Functions      :: JobControlFunctions
  -- job control
  , jcd_JobMap         :: ! JobMap
  , jcd_TaskMap        :: ! TaskMap
  , jcd_StateJobIdMap  :: ! StateJobIdMap
  , jcd_JobIdTaskIdMap :: ! JobIdTaskIdMap
  , jcd_TypeTaskIdMap  :: ! TypeTaskIdMap
  , jcd_StateTaskIdMap :: ! StateTaskIdMap
  } deriving (Show)


type JobController = MVar JobControllerData

printJobController :: JobController -> IO String
printJobController jc
  = withMVar jc $ \jcd -> return $ show jcd


data JobControllerException
    = KillServerException
      deriving (Show, Typeable)

instance Exception JobControllerException where

-- ----------------------------------------------------------------------------
-- Contruction / Destruction 
-- ----------------------------------------------------------------------------

defaultJobControllerData :: JobControllerData
defaultJobControllerData = jcd
  where
  jcd = JobControllerData
    Nothing
    10000 -- 10 milliseconds delay
    Nothing
    1
    1
    jcf
    Map.empty
    Map.empty
    MMap.empty
    MMap.empty
    MMap.empty
    MMap.empty
  jcf = JobControlFunctions
    dummyTaskSendFunction

newJobController :: IO JobController
newJobController
  = do
    let jcd = defaultJobControllerData
    jc <- newMVar jcd
    return jc 

closeJobController :: JobController -> IO ()
closeJobController jc
  = do
    stopJobController jc


setFileSystemToJobController :: FS.FileSystem -> JobController -> IO ()
setFileSystemToJobController fs jc
  = modifyMVar jc $
    \jcd -> return $ (jcd { jcd_FileSystem = Just fs }, ())


setTaskSendHook :: TaskSendFunction -> JobController -> IO ()
setTaskSendHook f jc
  = do
    modifyMVar jc $
      \jcd ->
      do
      let funs = jcd_Functions jcd
      let funs' = funs { jcf_TaskSend = f }
      return (jcd { jcd_Functions = funs' }, ())


-- ----------------------------------------------------------------------------
-- server functions
-- ----------------------------------------------------------------------------

startJobController :: JobController -> IO ()
startJobController jc
  = do
    modifyMVar jc $ 
      \jcd -> 
      do
      thd <- case (jcd_ServerThreadId jcd) of
        (Just i) -> return i
        (Nothing) ->
          do
          i <- forkIO $ doProcessing jc True
          return i
      return (jcd {jcd_ServerThreadId = (Just thd)}, ())



stopJobController :: JobController -> IO ()
stopJobController jc
  = do
    modifyMVar jc $ 
      \jcd -> 
      do
      case (jcd_ServerThreadId jcd) of
        (Nothing) -> return ()
        (Just i) -> 
          do
          throwTo i KillServerException
          yield
          return ()
      return (jcd {jcd_ServerThreadId = Nothing}, ())


isJobControllerRunning :: JobController -> IO Bool
isJobControllerRunning jc
  = withMVar jc $
      \jcd -> return $ isJust (jcd_ServerThreadId jcd)


singleStepJobControlling :: JobController -> IO ()
singleStepJobControlling jc
  = do
    singleStepAllowed <- withMVar jc $ 
      \jcd -> return $ isNothing (jcd_ServerThreadId jcd)
    if singleStepAllowed then doProcessing jc False else return ()

-- ----------------------------------------------------------------------------
-- private functions
-- ----------------------------------------------------------------------------

-- | get the JobIds, they can be filtered by JobState, if the filter is empty
--   all Jobs are returned.
getJobIds :: [JobState] -> JobControllerData -> [JobId]
getJobIds states jcd = Set.toList $ MMap.filterElements states (jcd_StateJobIdMap jcd)


-- | get the TaskIds, they can be filtered by JobId, TaskType and TaskState, if
--   a filter is empty, this attribute is not taken into account
getTaskIds :: [JobId] -> [TaskType] -> [TaskState] -> JobControllerData -> [TaskId]
getTaskIds jobs types states jcd = Set.toList $ intersections sets
  where
  jobSelected   = MMap.filterElements jobs (jcd_JobIdTaskIdMap jcd)
  typeSelected  = MMap.filterElements types (jcd_TypeTaskIdMap jcd)
  stateSelected = MMap.filterElements states (jcd_StateTaskIdMap jcd)
  sets = [jobSelected, typeSelected, stateSelected]


allTasksFinished :: JobControllerData -> JobId -> Bool
allTasksFinished jcd jid = null notFinishedTasks
  where
    notFinishedTasks = getTaskIds [jid] [] [TSIdle, TSSending, TSInProgress, TSCompleted] jcd



-- | adds a Job to the JobControllerData for execution
addJob :: JobData -> JobControllerData -> JobControllerData
addJob jd jcd = jcd { jcd_JobMap = jm', jcd_StateJobIdMap = sjm' }
  where
  jm' = Map.insert (jd_JobId jd) jd (jcd_JobMap jcd)
  sjm' = MMap.insert (jd_State jd) (jd_JobId jd) (jcd_StateJobIdMap jcd)


changeJobState :: JobId -> JobState -> JobControllerData -> JobControllerData
changeJobState jid js jcd = changeJobState' (Map.lookup jid (jcd_JobMap jcd))
  where
  changeJobState' (Nothing) = jcd
  changeJobState' (Just jd) = jcd { jcd_JobMap = jm', jcd_StateJobIdMap = sjm' }
    where
    jd' = jd { jd_State = js }  -- change JobData
    js' = jd_State jd           -- get old state
    jm' = Map.insert jid jd' (jcd_JobMap jcd) -- change JobData
    sjm' = MMap.insert js jid $ MMap.deleteElem js' jid (jcd_StateJobIdMap jcd) -- change StateJobIdMap


updateJob :: JobData -> JobControllerData -> JobControllerData
updateJob jd jcd = addJob jd $ maybe (jcd) (\_ -> changeJobState jid js jcd) maybeOldJob
  where
    maybeOldJob = (Map.lookup jid (jcd_JobMap jcd))
    jid = jd_JobId jd
    js = jd_State jd


addOutputToJob :: [(Int,[FunctionData])] -> JobData -> JobData
addOutputToJob outList jd = jd { jd_OutputMap = outputmap' }  
  where
  outAccu    = AMap.fromList outList
  state      = jd_State jd
  outputmap  = jd_OutputMap jd
  outputmap' = Map.insertWith (\m1 m2 -> AMap.union m1 m2) state outAccu outputmap 

addTask :: JobControllerData -> TaskData -> JobControllerData
addTask jcd td= jcd { jcd_TaskMap = tm', jcd_JobIdTaskIdMap = jtm', jcd_TypeTaskIdMap = ttm', jcd_StateTaskIdMap = stm' }
  where
    tid = td_TaskId td
    tm' = Map.insert tid td (jcd_TaskMap jcd)
    jtm' = MMap.insert (td_JobId td) tid (jcd_JobIdTaskIdMap jcd)
    ttm' = MMap.insert (td_Type td) tid (jcd_TypeTaskIdMap jcd)
    stm' = MMap.insert (td_State td) tid (jcd_StateTaskIdMap jcd)


changeTaskState :: TaskId -> TaskState -> JobControllerData -> JobControllerData
changeTaskState tid ts jcd = changeTaskState' (Map.lookup tid (jcd_TaskMap jcd))
  where
  changeTaskState' (Nothing) = jcd
  changeTaskState' (Just td) = 
    if isChangeTaskStateAllowed ts (td_State td)
      then jcd { jcd_TaskMap = tm', jcd_StateTaskIdMap = stm' }
      else jcd
    where
    td' = td { td_State = ts }  -- change TaskData
    ts' = td_State td           -- get old state
    tm' = Map.insert tid td' (jcd_TaskMap jcd) -- change TaskData
    stm' = MMap.insert ts tid $ MMap.deleteElem ts' tid (jcd_StateTaskIdMap jcd) -- change StateTaskIdMap


-- | newState oldState
isChangeTaskStateAllowed :: TaskState -> TaskState -> Bool
-- only Error-Tasks and Sending-Tasks can be reset to idle
isChangeTaskStateAllowed TSIdle       t = Set.member t $ Set.fromList [TSSending, TSError]
-- sending only from idle
isChangeTaskStateAllowed TSSending    t = Set.member t $ Set.fromList [TSIdle]
-- inProgress only from an idle Tasks or sending Task
isChangeTaskStateAllowed TSInProgress t = Set.member t $ Set.fromList [TSSending, TSIdle]
-- completed only sending, idle and inprogress
isChangeTaskStateAllowed TSCompleted  t = Set.member t $ Set.fromList [TSSending, TSIdle, TSInProgress]
-- finished all but not error
isChangeTaskStateAllowed TSFinished   t = Set.member t $ Set.fromList [TSSending, TSIdle, TSInProgress, TSCompleted]
-- we can always set a task to error
isChangeTaskStateAllowed TSError      _ = True
-- otherwise false (there are no other cases)
-- isChangeTaskStateAllowed _ _ = False

updateTaskOutput :: TaskId -> [(Int, [FunctionData])] -> JobControllerData -> JobControllerData
updateTaskOutput _ [] jcd = jcd
updateTaskOutput tid o jcd = updateTaskOuput' (Map.lookup tid (jcd_TaskMap jcd))
  where
  updateTaskOuput' (Nothing) = jcd
  updateTaskOuput' (Just td) = jcd { jcd_TaskMap = tm' }
    where
    td' = td { td_Output = o }  -- change TaskData
    tm' = Map.insert tid td' (jcd_TaskMap jcd) -- change TaskData


-- ----------------------------------------------------------------------------
-- Info an Debug
-- ----------------------------------------------------------------------------


-- ----------------------------------------------------------------------------
-- Task Controlling
-- ----------------------------------------------------------------------------


startJob 
  :: JobInfo -> JobController -> IO (JobId, MVar JobResult)
startJob ji jc
  = do
    modifyMVar jc $
      \jcd ->
      do
      -- test the job info... we don't want false jobs
      -- let (b, m) = testJobInfo ji (jcd_MapActionMap jcd) (jcd_ReduceActionMap jcd)
      -- if b 
      --   then do
      debugM localLogger $ "startJob: " ++ show ji
      
      (jcd',jd, jr) <- newJobData jcd ji
      return (addJob jd jcd', (jd_JobId jd, jr))
      --   else do
      --    return (jcd, Left m)

{-
stopJob :: JobId -> JobController -> IO ()
stopJob _ _
  = do
-}

performJob
  :: JobInfo -> JobController -> IO JobResult
performJob ji jc
  = do
    -- start the Job
    (_, mjr) <- startJob ji jc
    -- wait until the result is ready
    withMVar mjr $ \jr -> return jr

-- ----------------------------------------------------------------------------
-- Task Processing
-- ----------------------------------------------------------------------------


doProcessing :: JobController -> Bool -> IO ()
doProcessing jc loop
  = do
    catch (doProcessing' jc loop)
      handler
    where
      handler :: JobControllerException -> IO ()
      handler err = errorM cycleLogger $ "doProcessing: " ++ (show err)
      doProcessing' jc' loop'
        = do
          handleTasks jc'
          handleJobs jc'
          if loop' 
            then do
              delay <- withMVar jc' (\jcd -> return $ jcd_ServerDelay jcd)
              threadDelay delay
              doProcessing' jc' loop'
            else
              return ()



-- ----------------------------------------------------------------------------
-- Process Tasks
-- ----------------------------------------------------------------------------


-- | the send task doesn't block, because a timeout causes the whole job
--   controller to pause, we don't want this
sendTask :: JobController -> TaskData -> IO ()
sendTask jc td
  = do
    -- own thread for this... so it is non-blocking
    forkIO $ 
      do
      yield
      -- get the sendFunction (just read it)
      sendFunction <- withMVar jc $
        \jcd -> return $ jcf_TaskSend $ jcd_Functions jcd
      -- execute the send function (this might timeout or block)
      sendResult <- sendFunction td
      -- set the task in the correct state
      case sendResult of
        (TSRSend) ->
          do
          setTaskInProgress jc td
        (TSRNotSend) ->
          do
          -- this function can be called twice before it is finished
          -- and therefore, we introduced the "sending" state for tasks
          -- and if the sending is not ok, we have to set the task to idle again
          setTaskIdle jc td
        (TSRError) ->
          do
          setTaskError jc td
    return ()


-- toNextTaskState :: JobControllerData -> TaskData -> JobControllerData
-- toNextTaskState jcd td = changeTaskState (td_TaskId td) (getNextTaskState (td_State td)) jcd 

toSendingTaskState :: JobControllerData -> TaskData -> JobControllerData
toSendingTaskState jcd td = changeTaskState (td_TaskId td) TSSending jcd

finishTask :: JobControllerData -> TaskData -> JobControllerData
finishTask jcd td = changeTaskState (td_TaskId td) (getNextTaskState (td_State td)) jcd'  
-- toNextTaskState jcd' td
  where
    jd   = fromJust $ Map.lookup (td_JobId td) (jcd_JobMap jcd)
    jd'  = addOutputToJob (td_Output td) jd  
    jcd' = updateJob jd' jcd 


handleTasks :: JobController -> IO ()
handleTasks jc
  = modifyMVar jc $
      \jcd ->
      do
      infoM cycleLogger "processing Tasks:"
      -- get all runnning jobs (not idle...)
      let runningJobs = getJobIds [JSSplit, JSMap, JSCombine, JSReduce] jcd
      infoM cycleLogger $ "running Jobs:" ++ show runningJobs
      -- get all idle tasks
      let idleTasks = getTaskIds runningJobs [] [TSIdle] jcd
      infoM cycleLogger $ "idle Tasks:" ++ show idleTasks
      let idleTaskDatas = mapMaybe (\tid -> Map.lookup tid (jcd_TaskMap jcd)) idleTasks
      
      -- set these tasks to sending, so they won't be called again while the sending process
      let jcd1 = foldl toSendingTaskState jcd idleTaskDatas      
      -- get all sending taskdatas                  
      let sendingTaskDatas = mapMaybe (\tid -> Map.lookup tid (jcd_TaskMap jcd1)) idleTasks
      -- send all idle Tasks (non-blocking)
      mapM (sendTask jc) sendingTaskDatas
      
      -- get all completed Tasks
      let completedTasks = getTaskIds runningJobs [] [TSCompleted] jcd1
      let completedTaskDatas = mapMaybe (\tid -> Map.lookup tid (jcd_TaskMap jcd1)) completedTasks
      -- inProgressTasks = getTaskIds runningJobs [] [TSInProgress] jcd
      let jcd2 = foldl finishTask jcd1 completedTaskDatas
      return (jcd2,())


-- ----------------------------------------------------------------------------
-- Process Jobs
-- ----------------------------------------------------------------------------

toNextJobState :: JobControllerData -> JobData -> JobControllerData
toNextJobState jcd jd = changeJobState (jd_JobId jd) (getNextJobState (jd_State jd)) jcd

toIdleTaskState :: JobControllerData -> TaskData -> JobControllerData
toIdleTaskState jcd td = changeTaskState (td_TaskId td) TSIdle jcd

toInProgressTaskState :: JobControllerData -> TaskData -> JobControllerData
toInProgressTaskState jcd td = changeTaskState (td_TaskId td) TSInProgress jcd

toCompletedTaskState :: JobControllerData -> TaskData -> JobControllerData
toCompletedTaskState jcd td = updateTaskOutput tid o $ changeTaskState tid TSCompleted jcd
  where
  tid = td_TaskId td
  o = td_Output td

toErrorTaskState :: JobControllerData -> TaskData -> JobControllerData
toErrorTaskState jcd td = changeTaskState (td_TaskId td) TSError jcd


hasPhase :: JobData -> Bool
hasPhase jd = isJust $ getCurrentTaskAction jd


getCurrentTaskAction :: JobData -> Maybe ActionName
getCurrentTaskAction jd = getTaskAction' (jd_Info jd) (jd_State jd)
  where
  getTaskAction' ji JSSplit   = maybe Nothing (Just . ja_Name) (ji_SplitAction ji)
  getTaskAction' ji JSMap     = maybe Nothing (Just . ja_Name) (ji_MapAction ji)
  getTaskAction' ji JSCombine = maybe Nothing (Just . ja_Name) (ji_CombineAction ji)
  getTaskAction' ji JSReduce  = maybe Nothing (Just . ja_Name) (ji_ReduceAction ji) 
  getTaskAction' _  _         = Nothing


getCurrentTaskOutputType :: JobData -> TaskOutputType
getCurrentTaskOutputType jd = getTaskOutputType' (jd_Info jd) (jd_State jd)
  where
  getTaskOutputType' ji JSSplit   = maybe TOTFile (ja_Output) (ji_SplitAction ji)
  getTaskOutputType' ji JSMap     = maybe TOTFile (ja_Output) (ji_MapAction ji)
  getTaskOutputType' ji JSCombine = maybe TOTFile (ja_Output) (ji_CombineAction ji)
  getTaskOutputType' ji JSReduce  = maybe TOTRawTuple (ja_Output) (ji_ReduceAction ji)
  getTaskOutputType' _  _         = TOTFile


getCurrentTaskPartValue :: JobState -> JobInfo -> Maybe Int
getCurrentTaskPartValue state ji = gTPV state
  where
  gTPV JSIdle    = maybe (gTPV JSSplit)   (Just . ja_Count) (ji_SplitAction ji)
  gTPV JSSplit   = maybe (gTPV JSMap)     (Just . ja_Count) (ji_MapAction ji)
  gTPV JSMap     = maybe (gTPV JSCombine) (\_ -> Nothing)   (ji_CombineAction ji)
  gTPV JSCombine = maybe (gTPV JSReduce)  (Just . ja_Count) (ji_ReduceAction ji)
  gTPV JSReduce  = ji_NumOfResults ji
  gTPV _         = Nothing
  

createTasks :: JobControllerData -> JobData -> IO JobControllerData
createTasks jcd jd
  = do
    let state     = jd_State jd
        info      = jd_Info jd
        outputMap = (jd_OutputMap jd)
    -- our input is the output of the previous state
    let inputAccu = maybe (AMap.empty) (id) $ Map.lookup (getPrevJobState state) outputMap
    if (hasPhase jd)
      then do
        let inputList = AMap.toList inputAccu                
        let jid = jd_JobId jd
        let a = fromJust $ getCurrentTaskAction jd
        let ot = getCurrentTaskOutputType jd
        let opts = ji_Option info
        let n = getCurrentTaskPartValue state info
        let tt  = fromJust $ fromJobStatetoTaskType state
        -- create new tasks
        (jcd', taskDatas) <- mapAccumLM (\d i -> newTaskData d jid tt TSIdle opts n i a ot) jcd inputList
        -- add task to controller
        let jcd'' = foldl addTask jcd' taskDatas        
        return jcd''
      else do
        -- there is no action in this state, so we copy the input to the output
        let outputMap' = Map.insert state inputAccu outputMap 
        let jd' = jd { jd_OutputMap = outputMap' }
        return $ updateJob jd' jcd


createResults :: JobControllerData -> JobData -> IO JobControllerData
createResults jcd jd
  = do
    let outputAccu = maybe (AMap.empty) (id) $ Map.lookup JSCompleted (jd_OutputMap jd)
    let outputList = AMap.toList outputAccu
    let (JobResultContainer mVarResult) = jd_Result jd
    let res = JobResult $ concat $ map (snd) outputList
    putMVar mVarResult res
    return jcd  


handleJobs :: JobController -> IO ()
handleJobs jc
  = modifyMVar jc $
      \jcd ->
      do
      -- get all working jobs (idle or running)
      infoM cycleLogger "processing Jobs"
      let workingJobs = getJobIds [JSIdle, JSSplit, JSMap, JSCombine, JSReduce] jcd
      infoM cycleLogger $ "working jobs:" ++ show workingJobs
      -- process only jobs, whose current phase is done
      let jobsWithoutTasks = filter (allTasksFinished jcd) workingJobs
      infoM cycleLogger $ "jobsWithoutTasks:" ++ show jobsWithoutTasks
      -- get the old jobdatas
      let oldJobDatas = mapMaybe (\jid -> Map.lookup jid (jcd_JobMap jcd)) jobsWithoutTasks
      debugM cycleLogger $ "oldJobDatas:\n" ++ show oldJobDatas
      -- change the job states to the next phase
      let jcd1 = foldl toNextJobState jcd oldJobDatas
      -- get the new jobdatas
      let newJobDatas = mapMaybe (\jid -> Map.lookup jid (jcd_JobMap jcd1)) jobsWithoutTasks
      debugM cycleLogger $ "newJobDatas:\n" ++ show newJobDatas
      -- create new tasks for each Job
      jcd2 <- foldM createTasks jcd1 newJobDatas
      
      -- get all completed Jobs
      let completedJobs = getJobIds [JSCompleted] jcd2
      infoM cycleLogger $ "completed Jobs:" ++ show completedJobs
      -- get the completed JobDatas
      let completedJobDatas = mapMaybe (\jid -> Map.lookup jid (jcd_JobMap jcd2)) completedJobs
      -- change the job states to the next phase
      let jcd3 = foldl toNextJobState jcd2 completedJobDatas
      -- get the new jobdatas
      let finishedJobDatas = mapMaybe (\jid -> Map.lookup jid (jcd_JobMap jcd3)) completedJobs
      -- create new tasks for each Job
      jcd4 <- foldM createResults jcd3 finishedJobDatas
      
      return (jcd4, ())




-- ----------------------------------------------------------------------------   
-- handling Task-Responses
-- ----------------------------------------------------------------------------


setTaskIdle :: JobController -> TaskData -> IO ()
setTaskIdle jc td
  = do
    debugM localLogger $ "setTaskIdle: waiting... TaskId: " ++ show (td_TaskId td)
    modifyMVar jc $
      \jcd ->
      do
      debugM localLogger $ "setTaskIdle: setting... TaskId: " ++ show (td_TaskId td)
      let jcd' = toIdleTaskState jcd td
      return (jcd', ())


setTaskInProgress :: JobController -> TaskData -> IO ()
setTaskInProgress jc td
  = do
    debugM localLogger $ "setTaskInProgress: waiting... TaskId: " ++ show (td_TaskId td)
    modifyMVar jc $
      \jcd ->
      do
      debugM localLogger $ "setTaskInProgress: setting... TaskId: " ++ show (td_TaskId td)
      let jcd' = toInProgressTaskState jcd td
      return (jcd', ())


setTaskCompleted :: JobController -> TaskData -> IO ()
setTaskCompleted jc td
  = do
    debugM localLogger $ "setTaskCompleted: waiting... TaskId: " ++ show (td_TaskId td)
    modifyMVar jc $
      \jcd ->
      do
      debugM localLogger $ "setTaskCompleted: setting... TaskId: " ++ show (td_TaskId td)
      let jcd' = toCompletedTaskState jcd td
      return (jcd', ())

    
setTaskError :: JobController -> TaskData -> IO ()
setTaskError jc td
  = do
    debugM localLogger $ "setTaskError: waiting... TaskId: " ++ show (td_TaskId td)
    modifyMVar jc $
      \jcd ->
      do
      debugM localLogger $ "setTaskError: setting... TaskId: " ++ show (td_TaskId td)
      let jcd' = toErrorTaskState jcd td
      return (jcd', ())