module Bein.Daemon.Protocol where import Text.Printf () import Data.List ( intercalate ) import Database.HDBC ( fromSql, toSql, SqlValue(..) ) import Control.Monad () import qualified System.IO.Error as S ( try ) import Control.Monad.Trans () import Text.ParserCombinators.Parsec ( parse, (<|>), try, CharParser, many1, digit, newline, string, eof ) import System.Posix.Syslog ( Priority(Error), syslog ) import System.Posix.Types () import System.Posix.Signals ( sigTERM, signalProcess ) import Bein.SocketHandler () import Bein.Daemon.Types ( BeinM, ExecutionID(..), unExecutionID, DaemonState ) import Bein.Daemon.Commands ( encodeResponse, catchR, exitDaemon, reconfigureDaemon, updateJobs, query, update ) import Control.Monad.Reader ( MonadPlus(mzero), liftM, MonadReader(ask), MonadIO(..), ReaderT(runReaderT) ) import Bein.Configuration () daemonProtocol :: String -> BeinM DaemonState String daemonProtocol received = do case parse command "socket" received of Left e -> return $ "200 unknown command\n" ++ show e ++ "\n.\n" Right c -> executeCommand c tryR :: (MonadReader r m, MonadIO m) => ReaderT r IO a -> m (Either IOError a) tryR f = do st <- ask liftIO $ S.try (runReaderT f st) executeCommand :: BeinDCommand -> BeinM DaemonState String executeCommand Reconfigure = do tryR reconfigureDaemon >>= \x -> case x of Left e -> do liftIO $ syslog Error $ "Configuration error: " ++ show e return $ "error\n" ++ show e ++ "\n.\n" Right () -> return $ encodeResponse "0 ok" "" executeCommand Jobs = listJobs Nothing executeCommand (Status exs) = listJobs (Just [exs]) executeCommand (Run ex) = runJob ex `catchR` (\e -> return $ encodeResponse "15 database access error" (show e)) executeCommand (Stop _) = return $ encodeResponse "500 unimplemented command" "" executeCommand (Continue _) = return $ encodeResponse "500 unimplemented command" "" executeCommand (Kill ex) = killProcess ex `catchR` (\e -> return $ encodeResponse "15 database access error" (show e)) executeCommand Exit = exitDaemon >> return "" data BeinDCommand = Reconfigure | Jobs | Status ExecutionID | Run ExecutionID | Stop ExecutionID | Continue ExecutionID | Kill ExecutionID | Exit deriving (Eq,Show,Read) reconfigure :: CharParser st BeinDCommand jobs :: CharParser st BeinDCommand status :: CharParser st BeinDCommand run :: CharParser st BeinDCommand stop :: CharParser st BeinDCommand continue :: CharParser st BeinDCommand kill :: CharParser st BeinDCommand exit :: CharParser st BeinDCommand reconfigure = bodylessCommand "reconfigure" Reconfigure jobs = bodylessCommand "jobs" Jobs status = oneExecutionIDCommand "status" Status run = oneExecutionIDCommand "run" Run stop = oneExecutionIDCommand "stop" Stop continue = oneExecutionIDCommand "continue" Continue kill = oneExecutionIDCommand "kill" Kill exit = bodylessCommand "exit" Exit command :: CharParser st BeinDCommand command = foldr (<|>) mzero commands where commands = map try [reconfigure, jobs, status, run, stop, continue, kill, exit] integerLine :: (Read a, Integral a) => CharParser st a integerLine = do x <- many1 digit newline return $ read x manyExecutionIDsCommand :: String -> ([ExecutionID] -> a) -> CharParser st a manyExecutionIDsCommand cmd constr = do string cmd newline ids <- many1 integerLine eof return $ constr $ map ExecutionID ids oneExecutionIDCommand :: String -> (ExecutionID -> a) -> CharParser st a oneExecutionIDCommand cmd constr = do string cmd newline ids <- integerLine eof return $ constr $ ExecutionID ids bodylessCommand :: String -> a -> CharParser st a bodylessCommand cmd val = do string cmd newline eof return val listJobs :: Maybe [ExecutionID] -> BeinM DaemonState String listJobs v = listJobs' v `catchR` (\e -> return $ encodeResponse "15 database access error" (show e)) listJobs' :: Maybe [ExecutionID] -> BeinM DaemonState String listJobs' v = do jobList <- case v of Nothing -> query "select id,status,running_as_pid,scratch_dir from current_jobs" [] Just exs -> query "select id,status,running_as_pid,scratch_dir from current_jobs where array[id] <@ ?" [toSql exs] body <- liftM unlines $ mapM listJob jobList return $ encodeResponse "0 ok" body listJob :: [SqlValue] -> BeinM DaemonState String listJob [sId, sStatus, sPid, sScratchDir] = case fromSql sStatus of "waiting" -> do jobList <- query "select id from jobs_awaiting where awaited_by=?" [sId] let jobids = map (fromSql.head) jobList :: [Int] jid = fromSql sId :: Int return $ show jid ++ "\t--\twaiting\t--\t" ++ (intercalate ", " $ map show jobids) "pending" -> do let jid = fromSql sId :: Int return $ show jid ++ "\t--\tpending\t--" "running" -> do let jid = fromSql sId :: Int pid = fromSql sPid :: Integer scratchDir = fromSql sScratchDir :: FilePath return $ show jid ++ "\t" ++ show pid ++ "\tpending\t" ++ scratchDir other -> error $ "Invalid status: " ++ other listJob _ = error "Invalid arguments to listJob." runJob :: ExecutionID -> BeinM DaemonState String runJob ex = do query "select id from executions where id = ?" [toSql ex] >>= \r -> case r of [] -> return $ encodeResponse "110 unknown execution" (show $ unExecutionID ex) [_] -> (do update "select run(?)" [toSql ex] updateJobs return $ encodeResponse "0 ok" (show $ unExecutionID ex)) `catchR` (\e -> return $ encodeResponse "15 database access error" (show e)) _ -> error "Got multiple results back from the database. Coherence lost. Dying." killProcess :: ExecutionID -> BeinM DaemonState String killProcess ex = do query "select running_as_pid from current_jobs where id=?" [toSql ex] >>= \r -> case r of [] -> return $ encodeResponse "120 no such job" (show $ unExecutionID ex) [[sPid]] -> do update "delete from current_jobs where id=?" [toSql ex] if sPid /= SqlNull then let pid = fromSql sPid :: Integer in (do update "update executions set status='failed' where id=? and status='running'" [toSql ex] liftIO $ signalProcess sigTERM (fromIntegral pid) return $ encodeResponse "0 ok" (show $ unExecutionID ex)) `catchR` (\e -> return $ encodeResponse "150 failed to kill" (show e)) else return $ encodeResponse "0 ok" (show $ unExecutionID ex) _ -> error "Got multiple results back from the database. Coherence lost. Dying."