{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE ScopedTypeVariables #-} {-| This module contains a very basic periodic job processor backed by Redis, suitable to be run by clusters of machines (jobs will be run at least once per scheduled time, and will not run more than once unless the (configurable) timeout is reached, at which point retry occurs. There is also no guarantee that distribution of jobs across machines will be fair). It depends upon Redis not losing data once it has acknowledged it, and guaranteeing the atomicity that is specified for commands like EVAL (ie, that if you do several things within an EVAL, they will all happen or none will happen). Nothing has been tested with Redis clusters (and it likely will not work). An example use is the following (provided in repository), noting that we create two threads to process jobs, and could run this entire executable many times and the jobs would only run at the scheduled time. > {-# LANGUAGE OverloadedStrings #-} > import Control.Monad (forever) > import System.Periodic > import Control.Concurrent (threadDelay, forkIO) > import qualified Data.Text.IO as T > import qualified Database.Redis as R > main = do rconn <- R.connect R.defaultConnectInfo > scheduler <- create "default" rconn (CheckInterval (Seconds 1)) (LockTimeout (Seconds 1000)) (T.putStrLn) > addTask scheduler "print-hello-job" (Every (Seconds 100)) (T.putStrLn "hello") > addTask scheduler "print-bye-job" (Every (Seconds 10)) (T.putStrLn "bye") > forkIO (run scheduler) > forkIO (run scheduler) > forever (threadDelay 1000000) -} module System.Periodic ( -- * Types Time(..) , Seconds(..) , Period(..) , Name(..) , CheckInterval(..) , LockTimeout(..) , Logger , Scheduler -- * Managing Schedulers , create , run , destroy -- * Scheduling Tasks , addTask ) where import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar import Control.Exception (SomeException, catch) import Control.Monad (forever, join, when) import Data.Monoid import Data.Ratio import Data.Serialize import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import Data.Time.Calendar import Data.Time.Clock import Data.Time.Clock.POSIX import qualified Database.Redis as R -- | Time from midnight UTC newtype Time = Time DiffTime newtype Seconds = Seconds Int -- | When the job should run - either at a particular offset from -- midnight UTC, or every N seconds. data Period = Daily Time | Every Seconds -- | The name of the scheduler. This, when combined with the task -- name, should be unique on a given Redis server. newtype Name = Name Text -- | How frequently we should check if jobs need to get run. If all -- your jobs are infrequent (daily, or occurring hours apart), setting -- this to a high number (every minute, or more) decreases the number -- of queries to Redis. newtype CheckInterval = CheckInterval Seconds -- | How long a job that has been started and not marked as finished -- should take before we run it again. Note that if you put this -- number above the period, we will never retry the jobs (but they -- will still keep running every period). newtype LockTimeout = LockTimeout Seconds -- | A function where log messages are sent. type Logger = Text -> IO () -- | Internal type representing the current tasks that are scheduled. data Scheduler = Scheduler { schedulerName :: Name , schedulerRedisConn :: R.Connection , schedulerTasks :: MVar [(Text, Period, IO ())] , schedulerCheckInterval :: CheckInterval , schedulerLockTimout :: LockTimeout , schedulerLogger :: Logger } -- | This function creates a new scheduler, which can then have tasks -- scheduled in it. The tasks themselves are _not_ stored in Redis; it -- is only used to ensure we run the tasks at the right time (and not -- more than once across multiple machines). Note that this does not -- actually run any of the tasks - you must fork threads that call -- 'run'. create :: Name -> R.Connection -> CheckInterval -> LockTimeout -> (Text -> IO ()) -> IO Scheduler create name rconn check lock logger = do mv <- newMVar [] return $ Scheduler name rconn mv check lock logger -- | Add a new task to a given scheduler, to be run at the specified -- period. Note that the name, when combined with the scheduler name, -- should be unique for the given redis database, or else only one of -- the jobs with the same name will be run. addTask :: Scheduler -> Text -> Period -> IO () -> IO () addTask scheduler name when act = do modifyMVar_ (schedulerTasks scheduler) (return . (:) (name, when, act)) return () lockTimeout = 10 * 3600 -- | Start a scheduler thread. You must start at least one, but can -- start multiple threads if you have many tasks that take a long time -- to run. run :: Scheduler -> IO () run (Scheduler (Name nm) rconn mv (CheckInterval (Seconds check)) lock logger) = forever $ do now <- getCurrentTime tasks <- readMVar mv mapM_ (tryRunTask logger lock nm rconn now) tasks threadDelay (check * 1000000) lastStartedKey pname name = T.encodeUtf8 $ pname <> "-" <> name <> "-last-started" lockedKey pname name = T.encodeUtf8 $ pname <> "-" <> name <> "-running-at" -- | This clears out redis of any mention of tasks related to the -- scheduler. It isn't necessary to do in normal scenarios. destroy :: Scheduler -> IO () destroy (Scheduler (Name nm) rconn mv _ _ _) = do tasks <- readMVar mv R.runRedis rconn $ R.del (concatMap (\(tnm,_,_) -> [lastStartedKey nm tnm ,lockedKey nm tnm]) tasks) return () collapseError :: Either a (Maybe b) -> Maybe b collapseError (Left _) = Nothing collapseError (Right v) = v collapseNumberBoolFalse :: Either R.Reply (Maybe Integer) -> Bool collapseNumberBoolFalse (Left e) = error (show e) collapseNumberBoolFalse (Right (Just 1)) = True collapseNumberBoolFalse (Right (Just 0)) = False parseUnixTime s = let Right (n, d) = decode s in posixSecondsToUTCTime $ fromRational $ n % d renderUnixTime t = let r = toRational . utcTimeToPOSIXSeconds $ t in encode (numerator r, denominator r) minutes5 = 60 * 5 shouldRun :: LockTimeout -> Period -> Maybe UTCTime -> Maybe UTCTime -> UTCTime -> Bool shouldRun (LockTimeout (Seconds timeout)) _ _ (Just locked) now = diffUTCTime now locked > fromIntegral timeout shouldRun _ (Daily (Time t)) Nothing _ now | now > (now { utctDayTime = t }) && utctDayTime now - t < minutes5 = True shouldRun _ (Daily (Time t)) (Just last) _ now | now > (now { utctDayTime = t }) && last < (now { utctDayTime = t }) = True shouldRun _ (Every (Seconds n)) Nothing _ now = True shouldRun _ (Every (Seconds n)) (Just last) locked now = diffUTCTime now last > fromIntegral n shouldRun lock period last locked now = False tryRunTask :: Logger -> LockTimeout -> Text -> R.Connection -> UTCTime -> (Text, Period, IO ()) -> IO () tryRunTask logger timeout pname rconn now (name, period, task) = do lastStarted <- fmap parseUnixTime . collapseError <$> R.runRedis rconn (R.get (lastStartedKey pname name)) lockedAt <- fmap parseUnixTime . collapseError <$> R.runRedis rconn (R.get (lockedKey pname name)) when (shouldRun timeout period lastStarted lockedAt now) $ do gotLock <- collapseNumberBoolFalse <$> R.runRedis rconn (R.eval "local lock = redis.call('get', KEYS[1])\n\ \local last = redis.call('get', KEYS[2])\n\ \if ((lock == false and ARGV[1] == '0') or (lock == ARGV[1]))\ \ and ((last == false and ARGV[2] == '0') or (last == ARGV[2])) then\n\ \ redis.call('set', KEYS[1], ARGV[3])\n\ \ redis.call('set', KEYS[2], ARGV[3])\n\ \ return 1\n\ \else\n\ \ return 0\n\ \end" [lockedKey pname name, lastStartedKey pname name] [maybe "0" renderUnixTime lockedAt ,maybe "0" renderUnixTime lastStarted ,renderUnixTime now]) when gotLock $ do x <- newEmptyMVar jt <- forkIO (catch (task >> putMVar x Nothing) (\(e::SomeException) -> putMVar x (Just e))) res <- takeMVar x case res of Just e -> logger $ T.concat ["periodic[" ,pname ,"::" ,name ,"] error: " ,T.pack (show e)] Nothing -> return () R.runRedis rconn $ R.del [lockedKey pname name] return ()