module Yesod.JobQueue (
YesodJobQueue (..)
, JobQueue
, startDequeue
, enqueue
, JobState
, newJobState
, jobQueueInfo
, getJobQueue
) where
import Yesod.JobQueue.Routes
import Yesod.JobQueue.Types
import Yesod.JobQueue.GenericConstr
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.STM as STM
import Control.Concurrent.STM (TVar)
import Control.Lens ((^.))
import Control.Monad (forever, void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Reader (ReaderT, runReaderT)
import Data.Aeson (Value, (.=), object)
import Data.Aeson.TH (defaultOptions, deriveToJSON)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BSC
import Data.FileEmbed (embedFile)
import Data.Foldable (forM_)
import qualified Data.List as L
import Data.Maybe (catMaybes)
import Data.Proxy (Proxy(Proxy))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime, getCurrentTime)
import qualified Data.UUID as U
import qualified Data.UUID.V4 as U
import qualified Database.Redis as R
import GHC.Generics (Generic, Rep)
import Text.Read (readMaybe)
import Yesod.Core
(HandlerT, Html, Yesod, YesodSubDispatch(yesodSubDispatch), getYesod,
hamlet, invalidArgs, mkYesodSubDispatch, notFound, requireJsonBody,
returnJson, sendResponse, toContent, withUrlRenderer)
import Yesod.Persist.Core (YesodPersistBackend)
type ThreadNum = Int
type JobTypeString = String
data RunningJob = RunningJob {
jobType :: JobTypeString
, threadId :: ThreadNum
, jobId :: U.UUID
, startTime :: UTCTime
} deriving (Eq)
$(deriveToJSON defaultOptions ''RunningJob)
type JobState = TVar [RunningJob]
newJobState :: IO (TVar [RunningJob])
newJobState = STM.newTVarIO []
data JobQueueItem = JobQueueItem {
queueJobType :: JobTypeString
, queueTime :: UTCTime
} deriving (Show, Read)
$(deriveToJSON defaultOptions ''JobQueueItem)
class (Yesod master, Read (JobType master), Show (JobType master)
, Generic (JobType master), Constructors (Rep (JobType master))
)
=> YesodJobQueue master where
type JobType master
runJob :: (MonadBaseControl IO m, MonadIO m)
=> master -> JobType master -> ReaderT master m ()
queueConnectInfo :: master -> R.ConnectInfo
queueConnectInfo _ = R.defaultConnectInfo
queueKey :: master -> ByteString
queueKey _ = "yesod-job-queue"
threadNumber :: master -> Int
threadNumber _ = 1
runDBJob :: (MonadBaseControl IO m, MonadIO m)
=> ReaderT (YesodPersistBackend master) (ReaderT master m) a
-> ReaderT master m a
getJobState :: master -> JobState
jobAPIBaseUrl :: master -> String
jobAPIBaseUrl _ = "/job"
jobManagerJSUrl :: master -> String
jobManagerJSUrl m = (jobAPIBaseUrl m) ++ "/manager/app.js"
describeJob :: master -> JobTypeString -> Maybe Text
describeJob _ _ = Nothing
getClassInformation :: master -> [JobQueueClassInfo]
getClassInformation m = [jobQueueInfo m]
startDequeue :: (YesodJobQueue master, MonadBaseControl IO m, MonadIO m) => master -> m ()
startDequeue m = do
let num = threadNumber m
forM_ [1 .. num] $ startThread m
startThread :: forall master m . (YesodJobQueue master, MonadBaseControl IO m, MonadIO m)
=> master -> ThreadNum -> m ()
startThread m tNo = void $ liftIO $ forkIO $ do
conn <- R.connect $ queueConnectInfo m
R.runRedis conn $ forever $ do
result <- R.blpop [queueKey m] 600
liftIO $ handleResultFromRedis result
where
handleResultFromRedis :: Either R.Reply (Maybe (ByteString, ByteString)) -> IO ()
handleResultFromRedis (Left _) =
putStrLn "[dequeue] error in at connection redis"
handleResultFromRedis (Right Nothing) =
putStrLn "[dequeue] timeout retry"
handleResultFromRedis (Right (Just (_, redisValue))) = do
let item = readMaybe $ BSC.unpack redisValue
handleJob $ readJobType m =<< queueJobType <$> item
handleJob :: Maybe (JobType master) -> IO ()
handleJob Nothing = putStrLn "[dequeue] unknown JobType"
handleJob (Just jt) = do
jid <- U.nextRandom
time <- getCurrentTime
let runningJob = RunningJob
{ jobType = (show jt)
, threadId = tNo
, jobId = jid
, startTime = time
}
STM.atomically $ STM.modifyTVar (getJobState m) (runningJob:)
putStrLn $ "dequeued: " ++ (show jt)
runReaderT (runJob m jt) m
STM.atomically $ STM.modifyTVar (getJobState m) (L.delete runningJob)
enqueue :: (MonadIO m, YesodJobQueue master) => master -> JobType master -> m ()
enqueue m jt = liftIO $ do
time <- getCurrentTime
let item = JobQueueItem
{ queueJobType = show jt
, queueTime = time
}
conn <- R.connect $ queueConnectInfo m
void $ R.runRedis conn $ R.rpush (queueKey m) [BSC.pack $ show item]
listQueue :: YesodJobQueue master => master -> IO (Either String [JobQueueItem])
listQueue m = do
conn <- R.connect $ queueConnectInfo m
exs <- R.runRedis conn $ do
R.lrange (queueKey m) 0 (1)
case exs of
Right xs ->
return $ Right $ catMaybes
$ map (readMaybe . BSC.unpack) xs
Left r -> return $ Left $ show r
readJobType :: YesodJobQueue master => master -> String -> Maybe (JobType master)
readJobType _ = readMaybe
jobQueueInfo :: YesodJobQueue master => master -> JobQueueClassInfo
jobQueueInfo m = JobQueueClassInfo "JobQueue" [threadInfo]
where threadInfo = "Number of threads: " `T.append` (T.pack . show $ threadNumber m)
type JobHandler master a =
YesodJobQueue master => HandlerT JobQueue (HandlerT master IO) a
jobTypeProxy :: (YesodJobQueue m) => m -> Proxy (JobType m)
jobTypeProxy _ = Proxy
getJobR :: JobHandler master Value
getJobR = lift $ do
y <- getYesod
let parseConstr (c:args) = object ["type" .= c, "args" .= args, "description" .= describeJob y c]
constrs = map parseConstr $ genericConstructors $ jobTypeProxy y
let info = getClassInformation y
returnJson $ object ["jobTypes" .= constrs, "information" .= info]
getJobQueueR :: JobHandler master Value
getJobQueueR = lift $ do
y <- getYesod
Right q <- liftIO $ listQueue y
returnJson $ object ["queue" .= q]
postJobQueueR :: JobHandler master Value
postJobQueueR = lift $ do
y <- getYesod
body <- requireJsonBody :: HandlerT master IO PostJobQueueRequest
case readJobType y (body ^. job) of
Just jt -> do
liftIO $ enqueue y jt
returnJson $ object []
Nothing -> invalidArgs ["job"]
getJobStateR :: JobHandler master Value
getJobStateR = lift $ do
y <- getYesod
s <- liftIO $ STM.readTVarIO (getJobState y)
returnJson $ object ["running" .= s]
getJobManagerR :: JobHandler master Html
getJobManagerR = lift $ do
y <- getYesod
withUrlRenderer [hamlet|
$doctype 5
<html>
<head>
<title>YesodJobQueue Manager
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="https://code.getmdl.io/1.1.3/material.blue_grey-red.min.css">
<script defer src="https://code.getmdl.io/1.1.3/material.min.js">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<body>
<div class="mdl-layout mdl-js-layout mdl-layout--fixed-header">
<header class="mdl-layout__header">
<div class="mdl-layout__header-row">
<!-- Title -->
<span class="mdl-layout-title">YesodJobQueue Manager
<main class="mdl-layout__content">
<div id="app" class="page-content">
<div id="demo-toast-example" class="mdl-js-snackbar mdl-snackbar">
<div class="mdl-snackbar__text">
<button class="mdl-snackbar__action" type="button">
<script>
window.BASE_URL = "#{jobAPIBaseUrl y}"
<script src="#{jobManagerJSUrl y}">
|]
getJobManagerStaticR :: Text -> JobHandler master Value
getJobManagerStaticR f
| f == "app.js" = lift $ do
let content = toContent $(embedFile "app/dist/app.bundle.js")
sendResponse ("application/json" :: ByteString, content)
| otherwise = notFound
instance YesodJobQueue master => YesodSubDispatch JobQueue (HandlerT master IO) where
yesodSubDispatch = $(mkYesodSubDispatch resourcesJobQueue)
getJobQueue :: a -> JobQueue
getJobQueue = const JobQueue