-- ----------------------------------------------------------------------------
{- |
  Module     : Holumbus.MapReduce.TaskProcessor
  Copyright  : Copyright (C) 2008 Stefan Schmidt
  License    : MIT

  Maintainer : Stefan Schmidt (stefanschmidt@web.de)
  Stability  : experimental
  Portability: portable
  Version    : 0.1


-}
-- ----------------------------------------------------------------------------

{-# OPTIONS -fglasgow-exts #-}
module Holumbus.MapReduce.TaskProcessor
    (
      -- * Datatypes
      TaskResultFunction
    , TaskProcessor

    , printTaskProcessor

      -- * Creation and Destruction
    , newTaskProcessor
    , closeTaskProcessor
    , setFileSystemToTaskProcessor
    , setActionMap
    , setTaskCompletedHook  
    , setTaskErrorHook

      -- * TaskProcessor 
    , startTaskProcessor
    , stopTaskProcessor

      -- * Info an Debug
    , listTaskIds 
    , getActions
    , getActionNames

      -- * Task Creation and Destruction
    , startTask
    , stopTask
    , stopAllTasks 
    )
where

import           Prelude hiding                 ( catch )

import		 Control.Exception.Extensible   ( Exception
						, throw
						, catch
						)
import           Control.Concurrent

-- import           Data.Binary
-- import qualified Data.ByteString.Lazy as B

import qualified Data.Map 			as Map
import qualified Data.Set 			as Set
import           Data.Maybe
import           Data.Typeable

import           System.Log.Logger

import           Holumbus.Common.Utils			       ( handleAll )
import qualified Holumbus.Data.KeyMap 		as KMap
import           Holumbus.MapReduce.Types
import qualified Holumbus.FileSystem.FileSystem as FS



localLogger :: String
localLogger = "Holumbus.MapReduce.TaskProcessor"


taskLogger :: String
taskLogger = "Holumbus.MapReduce.TaskProcessor.task"

-- ----------------------------------------------------------------------------
-- Datatypes
-- ----------------------------------------------------------------------------

-- | a function for responding a
type TaskResultFunction = TaskData -> IO Bool

dummyTaskResultFunction :: TaskData -> IO Bool
dummyTaskResultFunction _ = return True 

data TaskProcessorFunctions
    = TaskProcessorFunctions { tpf_TaskCompleted :: TaskResultFunction
			     , tpf_TaskError     :: TaskResultFunction
			     }

instance Show TaskProcessorFunctions where
  show _ = "{TaskProcessorFunctions}"

data TaskProcessorException 
    = KillServerException
      deriving (Show, Typeable)

instance Exception TaskProcessorException where

data TaskException
    = KillTaskException
    | UnkownTaskException ActionName
      deriving (Show, Typeable)

instance Exception TaskException where

-- | data, needed by the MapReduce-System to run the tasks
data TaskProcessorData = TaskProcessorData {
  -- internal
    tpd_ServerThreadId    :: Maybe ThreadId
  , tpd_ServerDelay       :: Int
  , tpd_FileSystem        :: FS.FileSystem
  -- configuration
  , tpd_MaxTasks          :: Int
  , tpd_Functions         :: TaskProcessorFunctions
  , tpd_ActionMap         :: ActionMap
  -- task processing
  , tpd_TaskQueue         :: [TaskData]
  , tpd_CompletedTasks    :: Set.Set TaskData
  , tpd_ErrorTasks        :: Set.Set TaskData
  , tpd_TaskIdThreadMap   :: Map.Map TaskId ThreadId
  } deriving (Show)


type TaskProcessor = MVar TaskProcessorData


printTaskProcessor :: TaskProcessor -> IO String
printTaskProcessor tp
  = withMVar tp $ \tpd -> return $ show tpd



-- ----------------------------------------------------------------------------
-- Creation / Destruction
-- ----------------------------------------------------------------------------

defaultTaskProcessorData :: TaskProcessorData
defaultTaskProcessorData = tpd
  where
    funs = TaskProcessorFunctions 
      dummyTaskResultFunction
      dummyTaskResultFunction
    tpd = TaskProcessorData
      Nothing
      1000 -- one millisecond delay
      undefined
      1
      funs
      KMap.empty
      []
      Set.empty
      Set.empty
      Map.empty      


-- | creates a new TaskProcessor
newTaskProcessor :: IO TaskProcessor
newTaskProcessor
  = do
    let tpd = defaultTaskProcessorData
    tp <- newMVar tpd
    -- do not start this, perhaps the user want to change something first
    -- startTaskProcessor tp
    return tp


closeTaskProcessor :: TaskProcessor -> IO ()
closeTaskProcessor tp
  = do
    stopTaskProcessor tp


-- | add a filesystem-instance to the TaskProcessor
setFileSystemToTaskProcessor :: FS.FileSystem -> TaskProcessor -> IO ()
setFileSystemToTaskProcessor fs tp
  = modifyMVar tp $
    \tpd -> return $ (tpd { tpd_FileSystem = fs }, ())
  

-- | adds an ActionMap to the TaskProcessor
setActionMap :: KMap.KeyMap ActionData -> TaskProcessor -> IO ()
setActionMap m tp
  = modifyMVar tp $
    \tpd -> return $ (tpd { tpd_ActionMap = m }, ())


setTaskCompletedHook :: TaskResultFunction -> TaskProcessor -> IO ()  
setTaskCompletedHook f tp
  = modifyMVar tp $
      \tpd ->
      do
      let funs = tpd_Functions tpd
      let funs' = funs { tpf_TaskCompleted = f }
      return (tpd { tpd_Functions = funs' }, ())


setTaskErrorHook :: TaskResultFunction -> TaskProcessor -> IO ()
setTaskErrorHook f tp
  = modifyMVar tp $
      \tpd ->
      do
      let funs = tpd_Functions tpd
      let funs' = funs { tpf_TaskError = f }
      return (tpd { tpd_Functions = funs' }, ())

-- ----------------------------------------------------------------------------
-- server functions
-- ----------------------------------------------------------------------------

startTaskProcessor :: TaskProcessor -> IO ()
startTaskProcessor tp
  = do
    modifyMVar tp $ 
      \tpd -> 
      do
      thd <- case (tpd_ServerThreadId tpd) of
        (Just i) -> return i
        (Nothing) ->
          do
          i <- forkIO $ doProcessing tp
          return i
      return (tpd {tpd_ServerThreadId = (Just thd)}, ())



stopTaskProcessor :: TaskProcessor -> IO ()
stopTaskProcessor tp
  = do
    modifyMVar tp $ 
      \tpd -> 
      do
      case (tpd_ServerThreadId tpd) of
        (Nothing) -> return ()
        (Just i) -> 
          do
          throwTo i KillServerException
          yield
          return ()
      return (tpd {tpd_ServerThreadId = Nothing}, ())


-- ----------------------------------------------------------------------------
-- private functions
-- ----------------------------------------------------------------------------

containsTask :: TaskId -> TaskProcessorData -> Bool
containsTask tid tpd = isTaskRunning || isTaskQueued
  where
  isTaskRunning = Map.member tid (tpd_TaskIdThreadMap tpd)
  isTaskQueued = any (\td -> (td_TaskId td) == tid) (tpd_TaskQueue tpd)


queueTask :: TaskData -> TaskProcessorData -> TaskProcessorData
queueTask td tpd = tpd { tpd_TaskQueue = q' }
  where
  q = tpd_TaskQueue tpd
  q' = q ++ [td]

  
dequeueTask :: TaskId -> TaskProcessorData -> TaskProcessorData
dequeueTask tid tpd = tpd { tpd_TaskQueue = q' }
  where
  q = tpd_TaskQueue tpd
  q' = filter (\td -> (td_TaskId td) /= tid) q 


getTaskThreadId :: TaskId -> TaskProcessorData -> Maybe ThreadId
getTaskThreadId tid tpd = Map.lookup tid (tpd_TaskIdThreadMap tpd)


getTasksIds :: TaskProcessorData -> [TaskId]
getTasksIds tpd = Set.toList $ Set.union (Set.fromList qs) (Set.fromList ts) 
  where
    qs = map (\td -> td_TaskId td) (tpd_TaskQueue tpd)
    ts = Map.keys (tpd_TaskIdThreadMap tpd)


addTask :: TaskData -> TaskProcessorData -> TaskProcessorData
addTask td tpd = if containsTask tid tpd then tpd else queueTask td tpd
  where
  tid = td_TaskId td


deleteTask :: TaskId -> TaskProcessorData -> TaskProcessorData
deleteTask tid tpd = dequeueTask tid tpd'
  where
  tpd' = tpd { tpd_TaskIdThreadMap = ttm' }
  ttm = tpd_TaskIdThreadMap tpd
  ttm' = Map.delete tid ttm



-- ----------------------------------------------------------------------------
-- Info an Debug
-- ----------------------------------------------------------------------------


listTaskIds :: TaskProcessor -> IO [TaskId] 
listTaskIds tp
  = withMVar tp $
      \tpd -> return $ getTasksIds tpd


-- | Lists all Actions with Name, Descrition and so on
getActions :: TaskProcessor -> IO [ActionData]
getActions tp
  = withMVar tp $
      \tpd -> return $ KMap.elems (tpd_ActionMap tpd)


-- | Lists all Names of the Actions
getActionNames :: TaskProcessor -> IO [ActionName]
getActionNames tp
  = do
    actions <- getActions tp
    return $ map (\a -> ad_Name a) actions


-- ----------------------------------------------------------------------------
-- Task Controlling
-- ----------------------------------------------------------------------------


-- | adds a Task to the TaskProcessor, the execution might be later
startTask :: TaskData -> TaskProcessor -> IO ()
startTask td tp
  = do
    debugM localLogger $ "waiting to add Task " ++ show (td_TaskId td)
    modifyMVar tp $
      \tpd->
      do
      debugM localLogger $ "adding Task " ++ show (td_TaskId td)
      return (addTask td tpd, ())
      
  
stopTask :: TaskId -> TaskProcessor -> IO ()
stopTask tid tp
  = do
    debugM localLogger $ "waiting to stop Task " ++ show tid
    mthd <- modifyMVar tp $
      \tpd-> 
      do
      let thd = getTaskThreadId tid tpd 
      return (deleteTask tid tpd, thd)
    debugM localLogger $ "stopping Task " ++ show tid
    maybe (return ()) (\thd -> throwTo thd KillTaskException) mthd


stopAllTasks :: TaskProcessor -> IO () 
stopAllTasks tp
  = do
    debugM localLogger $ "waiting to stop all Tasks"
    tids <- withMVar tp $ \tpd -> return $ getTasksIds tpd
    _ <- mapM (\tid -> stopTask tid tp) tids
    return ()





-- ----------------------------------------------------------------------------
-- Task Processing
-- ----------------------------------------------------------------------------


setTaskCompleted :: TaskData -> TaskProcessorData -> TaskProcessorData
setTaskCompleted td tpd = tpd { tpd_TaskIdThreadMap = ttm', tpd_CompletedTasks = ct' }
  where
  tid = td_TaskId td
  ttm' = Map.delete tid (tpd_TaskIdThreadMap tpd)
  ct' = Set.insert td (tpd_CompletedTasks tpd)


setTaskError :: TaskData -> TaskProcessorData -> TaskProcessorData
setTaskError td tpd = tpd { tpd_TaskIdThreadMap = ttm', tpd_ErrorTasks = et' }
  where
  tid = td_TaskId td
  ttm' = Map.delete tid (tpd_TaskIdThreadMap tpd)
  et' = Set.insert td (tpd_ErrorTasks tpd)


-- | mark the task as error and invoke the reply function
reportErrorTask :: TaskData -> TaskProcessor -> IO ()
reportErrorTask td tp
  = modifyMVar tp $
      \tpd -> 
      do 
      let tpd' = setTaskError td tpd
      return (tpd', ())


-- | mark the task as completed and invoke the reply function
reportCompletedTask :: TaskData -> TaskProcessor -> IO ()
reportCompletedTask td tp
  = modifyMVar tp $
      \tpd -> 
      do 
      let tpd' = setTaskCompleted td tpd
      return (tpd', ())


setTaskRunning :: TaskId -> ThreadId -> TaskProcessorData -> TaskProcessorData
setTaskRunning tid thd tpd = tpd { tpd_TaskIdThreadMap = ttm' }
  where
  ttm = tpd_TaskIdThreadMap tpd
  ttm' = Map.insert tid thd ttm


getNextQueuedTask :: TaskProcessorData -> (Maybe TaskData, TaskProcessorData)
getNextQueuedTask tpd = (td , tpd { tpd_TaskQueue = q' })
  where
  q = tpd_TaskQueue tpd
  q' = if null q then q else tail q
  td = if null q then Nothing else Just $ head q

-- ----------------------------------------------------------------------------
--
-- ----------------------------------------------------------------------------


doProcessing :: TaskProcessor -> IO ()
doProcessing tp
  = do
    catch (doProcessing' tp)
      handler
    where
      handler :: TaskProcessorException -> IO ()
      handler err = putStrLn (show err)
      doProcessing' tp'
        = do
          handleNewTasks tp'
          handleFinishedTasks tp'
          delay <- withMVar tp' (\tpd -> return $ tpd_ServerDelay tpd)
          threadDelay delay
          doProcessing' tp'


handleNewTasks :: TaskProcessor -> IO ()
handleNewTasks tp
  = do
    modifyMVar tp $
      \tpd ->
      do
      tpd' <- handleNewTasks' tpd
      return (tpd',())
    where
    handleNewTasks' tpd 
      = do
        -- we can only start new tasks, if there are any left...
        let maxTasks = tpd_MaxTasks tpd
        let runningTasks = Map.size (tpd_TaskIdThreadMap tpd)
        let moreTasks = not $ null (tpd_TaskQueue tpd)
        if (moreTasks && (runningTasks < maxTasks)) 
          then do
            -- take the task from the queue
            let (mtd, tpd') = getNextQueuedTask tpd
            let td = fromJust mtd
            -- start its thread
            thd <- runTask td tp
            -- save it
            let tpd'' = setTaskRunning (td_TaskId td) thd tpd'
            -- try to start more
            handleNewTasks' tpd''
          else do
            return tpd


runTask :: TaskData -> TaskProcessor -> IO ThreadId
runTask td tp
  = do
    -- spawn a new thread for each tasks
    forkIO $ 
      handleAll ( \ e -> do
          errorM taskLogger $ "Exception: " ++ show e
          reportErrorTask td tp
        ) $ 
        do yield
           td' <- performTask td tp
           infoM taskLogger "report task completed"
           reportCompletedTask td' tp

-- not used, because we are doi    
handleFinishedTasks :: TaskProcessor -> IO ()
handleFinishedTasks tp
  = do
    modifyMVar tp $
      \tpd ->
      do
      cts' <- sendTasksResults (tpd_CompletedTasks tpd) (tpf_TaskCompleted $ tpd_Functions tpd)
      ets' <- sendTasksResults (tpd_ErrorTasks tpd) (tpf_TaskError $ tpd_Functions tpd)
      let tpd' = tpd { tpd_CompletedTasks = cts', tpd_ErrorTasks = ets' }
      return (tpd', ()) 

      
sendTasksResults :: Set.Set TaskData -> TaskResultFunction -> IO (Set.Set TaskData)
sendTasksResults set fun
  = do
    let ls = Set.toList set
    sendResults <- mapM fun ls 
    let (failures,_) = unzip $ filter (\(_,b) -> not b) $ zip ls sendResults
    return $ Set.fromList failures

-- ----------------------------------------------------------------------------
-- Performing a Task
-- ----------------------------------------------------------------------------

-- | doing a task
performTask :: TaskData -> TaskProcessor-> IO TaskData
performTask td tp
  = do
    infoM taskLogger $ "Task " ++ show (td_TaskId td)
    debugM taskLogger $ "input td: " ++ show td
    
    -- get all functions
    (ad, parts, bin, fs, opt) <- withMVar tp $
      \tpd ->
      do
      let action     = KMap.lookup (td_Action td) (tpd_ActionMap tpd)
      let parts      = (td_PartValue td)
      let input      = (td_Input td)
      let option     = (td_Option td)
      let filesystem = (tpd_FileSystem tpd)
      return (action, parts, input, filesystem, option)
    
    case ad of
      (Nothing) ->
        -- TODO throw execption here
        return td
      (Just a)  ->
        do
        action <- case (getActionForTaskType (td_Type td) a) of
          (Just a') -> return a'
          (Nothing) -> throw (UnkownTaskException $ td_Action td)
        
        let env = mkActionEnvironment td fs
        infoM taskLogger "starting action"
        bout <- action env opt parts bin
        let td' = td { td_Output = bout }
        infoM taskLogger "finished action"
        --debugM taskLogger $ "output td: " ++ show td'
        return td'