module Network.JobQueue.JobQueue.Internal where
import qualified Data.ByteString.Char8 as BS
import Control.Exception
import Data.Time.Clock
import Control.Monad
import Data.Maybe
import Network.JobQueue.Class
import Network.JobQueue.AuxClass
import Network.JobQueue.Types
import Network.JobQueue.Action
import Network.JobQueue.Job
import Network.JobQueue.Backend.Class
import Network.JobQueue.Backend.Types
data JobQueue e a where
JobQueue :: (BackendQueue q) => {
jqBackendQueue :: q
, jqActionState :: JobActionState e a
} -> JobQueue e a
data ActionForJob a = (Unit a) => Execute (Job a) | Delete | Skip
actionForJob :: Unit a => Job a -> String -> ActionForJob a
actionForJob job idName = do
case job of
StopTheWorld -> Execute job
_ -> case jobState job of
Initialized -> case (fmap fst . listToMaybe . reads) idName of
Just ident -> Execute $ job { jobState = Runnable, jobId = ident }
Nothing -> Execute $ job { jobState = Runnable, jobId = (1) }
Runnable -> Execute $ job { jobState = Running }
Running -> Skip
Aborted -> Skip
Finished -> Delete
peekJob' :: (Unit a) => JobQueue e a -> IO (Maybe (Job a, String, String, Int))
peekJob' JobQueue { jqBackendQueue = bq } = do
obj <- peekQueue bq
case obj of
Nothing -> return (Nothing)
Just (value, nodeName, idName, version) -> do
case (fmap fst . listToMaybe . reads) $ BS.unpack value of
Nothing -> return (Nothing)
Just job -> return (Just (job, nodeName, idName, version))
executeJob' :: (Aux e, Env e, Unit a) => JobQueue e a -> e -> String -> Job a -> Int -> IO (Either Break (Maybe (RuntimeState a)))
executeJob' jqueue@JobQueue { jqBackendQueue = bq, jqActionState = actionState } env nodeName currentJob version = do
currentTime <- getCurrentTime
if jobOnTime currentJob < currentTime
then do
runActionState actionState env (jobUnit currentJob)
else do
r <- updateJob jqueue nodeName currentJob { jobState = Finished } (version+1)
when r $ void $ writeQueue bq (pack $ currentJob { jobState = Runnable } ) (jobPriority currentJob)
return $ Right Nothing
afterExecuteJob :: (Aux e, Env e, Unit a) => JobQueue e a -> e -> String -> Job a -> Int -> Either Break (Maybe (RuntimeState a)) -> IO ()
afterExecuteJob jqueue env nodeName currentJob version mResult = case mResult of
Right (Just (RS mNextJu forks _)) -> do
case mNextJu of
Just nextJu -> do
_r <- updateJob jqueue nodeName currentJob { jobState = Runnable, jobUnit = nextJu } (version+1)
return ()
Nothing -> do
_r <- updateJob jqueue nodeName currentJob { jobState = Finished } (version+1)
return ()
forM_ (reverse forks) $ \(forked, ontime) -> rescheduleJob jqueue ontime forked
Right (Nothing) -> recover Nothing
Left (Failure _msg) -> do
n <- auxHandleFailure env (Just currentJob)
recover n
Left Retriable -> do
_r <- updateJob jqueue nodeName currentJob { jobState = Runnable } (version+1)
return ()
Left (Unhandled _someException) -> do
_r <- updateJob jqueue nodeName currentJob { jobState = Finished } (version+1)
return ()
where
recover n = case n of
Just nextJu -> do
_r <- updateJob jqueue nodeName nextJu (version+1)
return ()
Nothing -> do
_r <- updateJob jqueue nodeName currentJob { jobState = Finished } (version+1)
return ()
rescheduleJob :: (Unit a) => JobQueue e a -> Maybe UTCTime -> a -> IO ()
rescheduleJob JobQueue { jqBackendQueue = bq } mOntime ju = do
job <- case mOntime of
Just ontime -> createOnTimeJob Initialized ontime ju
Nothing -> createJob Initialized ju
void $ writeQueue bq (pack $ job) (getPriority ju)
updateJob :: (Unit a) => JobQueue e a -> String -> Job a -> Int -> IO (Bool)
updateJob JobQueue { jqBackendQueue = bq } nodeName job version = do
updateQueue bq nodeName (pack job) version `catch` handleError
where
handleError :: BackendError -> IO (Bool)
handleError _ = return (False)
pack :: (Unit a) => Job a -> BS.ByteString
pack = BS.pack . show