module Keenser
( Config
, Configurator
, Manager
, ManagerStatus(..)
, Middleware
, Worker(..)
, await
, checkStatus
, concurrency
, halt
, enqueue
, enqueueAt
, enqueueIn
, mkConf
, middleware
, record
, register
, retry
, sleep
, startProcess
) where
import Control.Concurrent (ThreadId)
import Control.Concurrent.Lifted (fork, forkFinally, killThread, threadDelay)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (newTVarIO, swapTVar, readTVarIO, writeTVar)
import Control.Monad.Logger
import Control.Monad.Trans.State
import Data.Aeson
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8)
import Database.Redis hiding (decode)
import Network.HostName (getHostName)
import System.Posix.Process (getProcessID)
import System.Posix.Signals
import System.Posix.Types (CPid(..))
import Debug.Trace
import Keenser.Import
import Keenser.Middleware
import Keenser.Types
import Keenser.Util
redisTimeout :: Integer
redisTimeout = 30
heartBeatFreq :: Int
heartBeatFreq = 5
mkConf :: Monad m => Connection -> Configurator m -> m (Config m)
mkConf conn conf = execStateT conf Config
{ kWorkers = M.empty
, kQueues = ["default"]
, kConcurrency = 25
, kRedis = conn
, kMiddleware = []
}
concurrency :: Monad m => Int -> Configurator m
concurrency n = modify $ \c -> c { kConcurrency = n }
register :: (MonadLogger m, MonadIO m, FromJSON a) => Worker m a -> Configurator m
register Worker{..} = modify $ \c -> c { kWorkers = M.insert workerName w $ kWorkers c }
where
w = Worker workerName workerQueue $ \obj ->
case fromJSON obj of
Data.Aeson.Success job -> workerPerform job
Data.Aeson.Error err -> $(logError) $ "job failed to parse: " <> T.pack (show obj) <> " / " <> T.pack err
enqueue :: (ToJSON a, MonadIO m) => Manager -> Worker m a -> a -> m ()
enqueue Manager{..} w args = do
job <- mkJob w args
liftIO . void . runRedis managerRedis $ queue job
enqueueAt :: (ToJSON a, MonadIO m) => UTCTime -> Manager -> Worker m a -> a -> m ()
enqueueAt at Manager{..} w args = do
job <- mkJob w args
liftIO . void . runRedis managerRedis $ zadd "schedule" [(timeToDouble at, LBS.toStrict $ encode job)]
enqueueIn :: (ToJSON a, MonadIO m) => Rational -> Manager -> Worker m a -> a -> m ()
enqueueIn snds m w args = do
now <- liftIO getCurrentTime
enqueueAt (snds `secondsFrom` now) m w args
startProcessor :: (MonadBaseControl IO m, MonadIO m, MonadLogger m)
=> Config m -> Manager -> m ()
startProcessor Config{..} m = repeatUntil (managerStopping m) $ do
let mqs = map (\q -> "queue:" <> q) $ managerQueues m
ejob <- liftIO . runRedis kRedis $ brpop mqs redisTimeout
case ejob of
Right (Just (q, jjob)) -> case dispatch kWorkers jjob of
Just (worker, Object obj, job) ->
runMiddleware
kMiddleware m worker obj q
(workerPerform worker $ jobArgs job)
_ -> $(logError) $ "could not find worker for " <> decodeUtf8 jjob
_ -> $(logDebug) "Nothing to do here"
dispatch :: M.Map WorkerName (Worker m Value) -> BS.ByteString -> Maybe (Worker m Value, Value, Job Value)
dispatch workers payload = do
jobj <- parseMaybe json payload
job <- getJob jobj
worker <- M.lookup (jobClass job) workers
return $! (worker, jobj, job)
getJob :: Value -> Maybe (Job Value)
getJob v = case fromJSON v of
Success j -> Just j
_ -> Nothing
listenForSignals :: (MonadLogger m, MonadBaseControl IO m, MonadIO m) => Manager -> m ()
listenForSignals m@Manager{..} = void . fork . forever $ do
esig <- liftIO . runRedis managerRedis $ brpop [managerIdentity <> "-signals"] redisTimeout
case esig of
Right (Just (_, sig)) -> handleSignal m sig
_ -> return ()
handleSignal :: (MonadLogger m, MonadBaseControl IO m, MonadIO m) => Manager -> BS.ByteString -> m ()
handleSignal m sig
| sig == "USR1" = quiet m
| sig == "TERM" = halt m
| otherwise = $(logError) $ "Unrecognized signal: " <> decodeUtf8 sig
startProcess :: (MonadBaseControl IO m, MonadIO m, MonadLogger m) => Config m -> m Manager
startProcess c@Config{..} = do
m <- mkManager c
startBeat m
listenForSignals m
startPolling m
forM_ [1 .. kConcurrency] $ \n ->
let name = "processor " <> T.pack (show n)
in forkWatch m name $ startProcessor c m
return $! m
startPolling :: (MonadBaseControl IO m, MonadLogger m, MonadIO m) => Manager -> m ()
startPolling m@Manager{..} = forkWatch m "poller" . forever $ do
forM_ ["schedule", "retry"] $ \q -> do
er <- liftIO $ do
now <- getCurrentTime
runRedis managerRedis $ zrangebyscoreLimit q 0 (timeToDouble now) 0 1
case er of
Left err -> $(logError) $ "Polling error " <> T.pack (show err)
Right jobs -> forM_ jobs $ liftIO . runRedis managerRedis . requeueFrom q
liftIO $ sleep 5
requeueFrom :: Queue -> BS.ByteString -> Redis ()
requeueFrom q payload = case decode $ LBS.fromStrict payload of
Just job -> do
let q2 = jobQueue (job :: Job Value)
lpush q2 [payload]
zrem q [payload]
return ()
_ -> return ()
mkManager :: MonadIO m => Config m -> m Manager
mkManager Config{..} = liftIO $ do
now <- getCurrentTime
host <- getHostName
CPid pid <- getProcessID
nonce <- randomHex 6
let key = BSC.pack $ concat [host, ".", show pid, ".", nonce]
Manager (BSC.pack host) now pid kConcurrency kQueues [] key kRedis
<$> newTVarIO M.empty
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO False
clearRedis :: Manager -> Bool -> IO ()
clearRedis Manager{..} raise = do
_ <- liftIO . void . runRedis managerRedis $ do
srem "processes" [managerIdentity]
del [managerIdentity]
when raise $
raiseSignal keyboardSignal
startBeat :: (MonadBaseControl IO m, MonadLogger m, MonadIO m) => Manager -> m ThreadId
startBeat m@Manager{..} = do
liftIO $ installHandler keyboardSignal (CatchOnce $ clearRedis m True) Nothing
forkFinally (heartBeat m) $ \e -> do
case e of
Left _ -> $(logError) $ "Heartbeat error " <> T.pack (show e)
_ -> $(logDebug) $ "Stopping heartbeat"
liftIO $ clearRedis m False
workload :: MonadIO m => Manager -> m [RunningJob]
workload Manager{..} = liftIO $ M.elems <$> readTVarIO managerRunning
workToRedis :: RunningJob -> (BS.ByteString, BS.ByteString)
workToRedis j = (BSC.pack . trim "ThreadId " . show $ rjThread j, LBS.toStrict $ encode j)
heartBeat :: MonadIO m => Manager -> m ()
heartBeat m@Manager{..} = liftIO $ do
runRedis managerRedis $ do
_ <- del ["queues"]
sadd "processes" [managerIdentity]
sadd "queues" managerQueues
forever $ do
now <- getCurrentTime
working <- workload m
stopping <- readTVarIO managerQuiet
dones <- atomically $ swapTVar managerComplete 0
fails <- atomically $ swapTVar managerFailed 0
runRedis managerRedis $ do
hmset managerIdentity
[ ("beat", timestamp now)
, ("info", LBS.toStrict $ encode m)
, ("busy", BSC.pack . show $ length working)
, ("quiet", boolToRedis stopping)
]
_ <- del [managerIdentity <> ":workers"]
hmset (managerIdentity <> ":workers") $ map workToRedis working
incrby "stat:processed" dones
incrby ("stat:complete:" <> daystamp now) dones
incrby "stat:failed" fails
incrby ("stat:failed:" <> daystamp now) dones
expire managerIdentity 60
sleep heartBeatFreq
checkStatus :: MonadIO m => Manager -> Connection -> m ManagerStatus
checkStatus Manager{..} conn = liftIO . runRedis conn $ do
procs <- smembers "processes"
info <- hgetall managerIdentity
done <- Database.Redis.get "stat:processed"
failed <- Database.Redis.get "stat:failed"
qs <- mapM getQueueLength managerQueues
return $! ManagerStatus (d procs []) (d info []) (d' done 0) (d' failed 0) qs
d :: Either Reply a -> a -> a
d (Right a) _ = a
d _ a = a
d' :: Read a => Either Reply (Maybe BSC.ByteString) -> a -> a
d' (Right (Just a)) _ = read $ BSC.unpack a
d' _ a = a
getQueueLength :: BS.ByteString -> Redis (BS.ByteString, Integer)
getQueueLength q = do
len <- llen $ "queue:" <> q
return $! (q, d len 0)
forkWatch :: (MonadLogger m, MonadBaseControl IO m, MonadIO m) => Manager -> T.Text -> m () -> m ()
forkWatch m name a = do
$(logInfo) $ name <> " starting."
void $ forkFinally a restart
where
restart (Left e) = do
$(logError) $ name <> " exited " <> T.pack (show e) <> ". Restarting."
forkWatch m name a
restart (Right _) = $(logDebug) $ name <> " exited clean."
await :: Manager -> IO ()
await m = do
repeatUntil (managerStopping m) $ sleep 2
putStrLn "Shutting down ..."
sleep 2
sleep :: Int -> IO ()
sleep n = threadDelay $ n * 1000000
halt :: (MonadBaseControl IO m, MonadIO m) => Manager -> m ()
halt m@Manager{..} = do
quiet m
running <- liftIO $ readTVarIO managerRunning
mapM_ killThread $ M.keys running
quiet :: MonadIO m => Manager -> m ()
quiet Manager{..} = liftIO . atomically $ writeTVar managerQuiet True
managerStopping :: MonadIO m => Manager -> m Bool
managerStopping = liftIO . readTVarIO . managerQuiet