{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE TemplateHaskell #-} module Keenser ( Config , Configurator , Manager , ManagerStatus(..) , Middleware , Worker(..) , await , checkStatus , concurrency , halt , enqueue , enqueueAt , enqueueIn , mkConf , middleware , record , register , retry , sleep , startProcess ) where import Control.Concurrent (ThreadId) import Control.Concurrent.Lifted (fork, forkFinally, killThread, threadDelay) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TVar (newTVarIO, swapTVar, readTVarIO, writeTVar) import Control.Monad.Logger import Control.Monad.Trans.State import Data.Aeson import qualified Data.ByteString as BS import qualified Data.ByteString.Char8 as BSC import qualified Data.ByteString.Lazy as LBS import qualified Data.Map as M import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8) import Database.Redis hiding (decode) import Network.HostName (getHostName) import System.Posix.Process (getProcessID) import System.Posix.Signals import System.Posix.Types (CPid(..)) import Debug.Trace import Keenser.Import import Keenser.Middleware import Keenser.Types import Keenser.Util redisTimeout :: Integer redisTimeout = 30 heartBeatFreq :: Int heartBeatFreq = 5 -- TODO: check for and handle Redis errors throughout mkConf :: Monad m => Connection -> Configurator m -> m (Config m) mkConf conn conf = execStateT conf Config { kWorkers = M.empty , kQueues = ["default"] , kConcurrency = 25 , kRedis = conn , kMiddleware = [] } concurrency :: Monad m => Int -> Configurator m concurrency n = modify $ \c -> c { kConcurrency = n } -- TODO: -- - verify that worker names are unique -- - update queue list (or at least warn if there are queues that aren't being covered) register :: (MonadLogger m, MonadIO m, FromJSON a) => Worker m a -> Configurator m register Worker{..} = modify $ \c -> c { kWorkers = M.insert workerName w $ kWorkers c } where w = Worker workerName workerQueue $ \obj -> case fromJSON obj of Data.Aeson.Success job -> workerPerform job Data.Aeson.Error err -> $(logError) $ "job failed to parse: " <> T.pack (show obj) <> " / " <> T.pack err enqueue :: (ToJSON a, MonadIO m) => Manager -> Worker m a -> a -> m () enqueue Manager{..} w args = do job <- mkJob w args liftIO . void . runRedis managerRedis $ queue job enqueueAt :: (ToJSON a, MonadIO m) => UTCTime -> Manager -> Worker m a -> a -> m () enqueueAt at Manager{..} w args = do job <- mkJob w args liftIO . void . runRedis managerRedis $ zadd "schedule" [(timeToDouble at, LBS.toStrict $ encode job)] enqueueIn :: (ToJSON a, MonadIO m) => Rational -> Manager -> Worker m a -> a -> m () enqueueIn snds m w args = do now <- liftIO getCurrentTime enqueueAt (snds `secondsFrom` now) m w args -- TODO: -- - handle the various failure modes here (timeout, can't match worker, can't get work) -- - if doWork crashes, we should requeue the job, kill this thread, and restart a new processor -- - track down the intermittent ": hFlush: illegal operation (handle is closed)" error from here startProcessor :: (MonadBaseControl IO m, MonadIO m, MonadLogger m) => Config m -> Manager -> m () startProcessor Config{..} m = repeatUntil (managerStopping m) $ do let mqs = map (\q -> "queue:" <> q) $ managerQueues m ejob <- liftIO . runRedis kRedis $ brpop mqs redisTimeout case ejob of Right (Just (q, jjob)) -> case dispatch kWorkers jjob of Just (worker, Object obj, job) -> runMiddleware kMiddleware m worker obj q (workerPerform worker $ jobArgs job) _ -> $(logError) $ "could not find worker for " <> decodeUtf8 jjob _ -> $(logDebug) "Nothing to do here" dispatch :: M.Map WorkerName (Worker m Value) -> BS.ByteString -> Maybe (Worker m Value, Value, Job Value) dispatch workers payload = do jobj <- parseMaybe json payload job <- getJob jobj worker <- M.lookup (jobClass job) workers return $! (worker, jobj, job) getJob :: Value -> Maybe (Job Value) getJob v = case fromJSON v of Success j -> Just j _ -> Nothing listenForSignals :: (MonadLogger m, MonadBaseControl IO m, MonadIO m) => Manager -> m () listenForSignals m@Manager{..} = void . fork . forever $ do esig <- liftIO . runRedis managerRedis $ brpop [managerIdentity <> "-signals"] redisTimeout case esig of Right (Just (_, sig)) -> handleSignal m sig _ -> return () handleSignal :: (MonadLogger m, MonadBaseControl IO m, MonadIO m) => Manager -> BS.ByteString -> m () handleSignal m sig | sig == "USR1" = quiet m | sig == "TERM" = halt m | otherwise = $(logError) $ "Unrecognized signal: " <> decodeUtf8 sig startProcess :: (MonadBaseControl IO m, MonadIO m, MonadLogger m) => Config m -> m Manager startProcess c@Config{..} = do m <- mkManager c startBeat m listenForSignals m startPolling m forM_ [1 .. kConcurrency] $ \n -> let name = "processor " <> T.pack (show n) in forkWatch m name $ startProcessor c m return $! m startPolling :: (MonadBaseControl IO m, MonadLogger m, MonadIO m) => Manager -> m () startPolling m@Manager{..} = forkWatch m "poller" . forever $ do forM_ ["schedule", "retry"] $ \q -> do er <- liftIO $ do now <- getCurrentTime runRedis managerRedis $ zrangebyscoreLimit q 0 (timeToDouble now) 0 1 case er of Left err -> $(logError) $ "Polling error " <> T.pack (show err) Right jobs -> forM_ jobs $ liftIO . runRedis managerRedis . requeueFrom q liftIO $ sleep 5 -- TODO: adaptive poll interval requeueFrom :: Queue -> BS.ByteString -> Redis () requeueFrom q payload = case decode $ LBS.fromStrict payload of Just job -> do let q2 = jobQueue (job :: Job Value) lpush q2 [payload] zrem q [payload] return () _ -> return () mkManager :: MonadIO m => Config m -> m Manager mkManager Config{..} = liftIO $ do now <- getCurrentTime host <- getHostName CPid pid <- getProcessID nonce <- randomHex 6 let key = BSC.pack $ concat [host, ".", show pid, ".", nonce] Manager (BSC.pack host) now pid kConcurrency kQueues [] key kRedis <$> newTVarIO M.empty <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO False clearRedis :: Manager -> Bool -> IO () clearRedis Manager{..} raise = do _ <- liftIO . void . runRedis managerRedis $ do srem "processes" [managerIdentity] del [managerIdentity] when raise $ raiseSignal keyboardSignal startBeat :: (MonadBaseControl IO m, MonadLogger m, MonadIO m) => Manager -> m ThreadId startBeat m@Manager{..} = do liftIO $ installHandler keyboardSignal (CatchOnce $ clearRedis m True) Nothing forkFinally (heartBeat m) $ \e -> do case e of Left _ -> $(logError) $ "Heartbeat error " <> T.pack (show e) _ -> $(logDebug) $ "Stopping heartbeat" liftIO $ clearRedis m False workload :: MonadIO m => Manager -> m [RunningJob] workload Manager{..} = liftIO $ M.elems <$> readTVarIO managerRunning workToRedis :: RunningJob -> (BS.ByteString, BS.ByteString) workToRedis j = (BSC.pack . trim "ThreadId " . show $ rjThread j, LBS.toStrict $ encode j) heartBeat :: MonadIO m => Manager -> m () heartBeat m@Manager{..} = liftIO $ do runRedis managerRedis $ do _ <- del ["queues"] sadd "processes" [managerIdentity] sadd "queues" managerQueues forever $ do now <- getCurrentTime working <- workload m stopping <- readTVarIO managerQuiet dones <- atomically $ swapTVar managerComplete 0 fails <- atomically $ swapTVar managerFailed 0 -- For what it's worth, an abort between here and the end of the block could cause us to under-report stats runRedis managerRedis $ do hmset managerIdentity [ ("beat", timestamp now) , ("info", LBS.toStrict $ encode m) , ("busy", BSC.pack . show $ length working) , ("quiet", boolToRedis stopping) ] _ <- del [managerIdentity <> ":workers"] hmset (managerIdentity <> ":workers") $ map workToRedis working incrby "stat:processed" dones incrby ("stat:complete:" <> daystamp now) dones incrby "stat:failed" fails incrby ("stat:failed:" <> daystamp now) dones expire managerIdentity 60 sleep heartBeatFreq checkStatus :: MonadIO m => Manager -> Connection -> m ManagerStatus checkStatus Manager{..} conn = liftIO . runRedis conn $ do procs <- smembers "processes" info <- hgetall managerIdentity done <- Database.Redis.get "stat:processed" failed <- Database.Redis.get "stat:failed" qs <- mapM getQueueLength managerQueues return $! ManagerStatus (d procs []) (d info []) (d' done 0) (d' failed 0) qs -- TODO: clean this up d :: Either Reply a -> a -> a d (Right a) _ = a d _ a = a d' :: Read a => Either Reply (Maybe BSC.ByteString) -> a -> a d' (Right (Just a)) _ = read $ BSC.unpack a d' _ a = a getQueueLength :: BS.ByteString -> Redis (BS.ByteString, Integer) getQueueLength q = do len <- llen $ "queue:" <> q return $! (q, d len 0) forkWatch :: (MonadLogger m, MonadBaseControl IO m, MonadIO m) => Manager -> T.Text -> m () -> m () forkWatch m name a = do $(logInfo) $ name <> " starting." void $ forkFinally a restart where restart (Left e) = do $(logError) $ name <> " exited " <> T.pack (show e) <> ". Restarting." forkWatch m name a restart (Right _) = $(logDebug) $ name <> " exited clean." await :: Manager -> IO () await m = do repeatUntil (managerStopping m) $ sleep 2 putStrLn "Shutting down ..." sleep 2 sleep :: Int -> IO () sleep n = threadDelay $ n * 1000000 halt :: (MonadBaseControl IO m, MonadIO m) => Manager -> m () halt m@Manager{..} = do quiet m running <- liftIO $ readTVarIO managerRunning mapM_ killThread $ M.keys running quiet :: MonadIO m => Manager -> m () quiet Manager{..} = liftIO . atomically $ writeTVar managerQuiet True managerStopping :: MonadIO m => Manager -> m Bool managerStopping = liftIO . readTVarIO . managerQuiet