odd-jobs-0.1.0: A full-featured PostgreSQL-backed job queue (with an admin UI)

Safe HaskellNone
LanguageHaskell2010

OddJobs.Job

Contents

Synopsis

Starting the job-runner

startJobRunner :: Config -> IO () Source #

Start the job-runner in the current thread, i.e. you'll need to use forkIO or async manually, if you want the job-runner to run in the background. Consider using Cli to rapidly build your own standalone daemon.

Configuring the job-runner

data Config Source #

While odd-jobs is highly configurable and the Config data-type might seem daunting at first, it is not necessary to tweak every single configuration parameter by hand. Please start-off by using the sensible defaults provided by the configuration helpers, and tweaking config parameters on a case-by-case basis.

Constructors

Config 

Fields

  • cfgTableName :: TableName

    The DB table which holds your jobs. Please note, this should have been created by the createJobTable function.

  • cfgJobRunner :: Job -> IO ()

    The actualy "job-runner" that you need to provide. Please look at the examples/tutorials if your applicaton's code is not in the IO monad.

  • cfgDefaultMaxAttempts :: Int

    The number of times a failing job is retried before it is considered is "permanently failed" and ignored by the job-runner. This config parameter is called "default max attempts" because, in the future, it would be possible to specify the number of retry-attemps on a per-job basis (Note: per-job retry-attempts has not been implemented yet)

  • cfgConcurrencyControl :: ConcurrencyControl

    Controls how many jobs can be run concurrently by this instance of the job-runner. Please note, this is NOT the global concurrency of entire job-queue. It is possible to have job-runners running on multiple machines, and each will apply the concurrency control independnt of other job-runners. TODO: Link-off to relevant section in the tutorial.

  • cfgDbPool :: Pool Connection

    The DB connection-pool to use for the job-runner. Note: in case your jobs require a DB connection, please create a separate connection-pool for them. This pool will be used ONLY for monitoring jobs and changing their status. We need to have _at least 4 connections__ in this connection-pool for the job-runner to work as expected. (TODO: Link-off to tutorial)

  • cfgPollingInterval :: Seconds

    How frequently should the jobPoller check for jobs where the Job's jobRunAt field indicates that it's time for the job to be executed. TODO: link-off to the tutorial.

  • cfgOnJobSuccess :: Job -> IO ()

    User-defined callback function that is called whenever a job succeeds.

  • cfgOnJobFailed :: Job -> IO ()

    User-defined callback function that is called whenever a job fails. This does not indicate permanent failure and means the job will be retried. It is a good idea to log the failures to Airbrake, NewRelic, Sentry, or some other error monitoring tool.

  • cfgOnJobPermanentlyFailed :: Job -> IO ()

    User-defined callback function that is called whenever a job fails permanently (i.e. number of retry-attempts have crossed the configured threshold). It is a good idea to log the failures to Airbrake, NewRelic, Sentry, or some other error monitoring tool.

  • cfgOnJobStart :: Job -> IO ()

    User-defined callback function that is called whenever a job starts execution.

  • cfgOnJobTimeout :: Job -> IO ()

    User-defined callback function that is called whenever a job times-out.

  • cfgPidFile :: Maybe FilePath

    File to store the PID of the job-runner process. This is used only when invoking the job-runner as an independent background deemon (the usual mode of deployment). (TODO: Link-off to tutorial).

  • cfgLogger :: LogLevel -> LogEvent -> IO ()

    A "structured logging" function that you need to provide. The odd-jobs library does NOT use the standard logging interface provided by 'monad-logger' on purpose. TODO: link-off to tutorial. Please also read cfgJobToText and cfgJobType

  • cfgJobToText :: Job -> Text

    When emitting certain text messages in logs, how should the Job be summarized in a textual format? Related: defaultJobToText

  • cfgJobType :: Job -> Text

    How to extract the "job type" from a Job. Related: defaultJobType

data ConcurrencyControl Source #

Constructors

MaxConcurrentJobs Int

The maximum number of concurrent jobs that this instance of the job-runner can execute. TODO: Link-off to tutorial.

UnlimitedConcurrentJobs

Not recommended: Please do not use this in production unless you know what you're doing. No machine can support unlimited concurrency. If your jobs are doing anything worthwhile, running a sufficiently large number concurrently is going to max-out some resource of the underlying machine, such as, CPU, memory, disk IOPS, or network bandwidth.

DynamicConcurrency (IO Bool)

Use this to dynamically determine if the next job should be picked-up, or not. This is useful to write custom-logic to determine whether a limited resource is below a certain usage threshold (eg. CPU usage is below 80%). Caveat: This feature has not been tested in production, yet. TODO: Link-off to tutorial.

Configuration helpers

defaultConfig Source #

Arguments

:: (LogLevel -> LogEvent -> IO ())

"Structured logging" function. Ref: cfgLogger

-> TableName

DB table which holds your jobs. Ref: cfgTableName

-> Pool Connection

DB connection-pool to be used by job-runner. Ref: cfgDbPool

-> ConcurrencyControl

Concurrency configuration. Ref: cfgConcurrencyControl

-> (Job -> IO ())

The actual "job runner" which contains your application code. Ref: cfgJobRunner

-> Config 

This function gives you a Config with a bunch of sensible defaults already applied. It requies the bare minimum arguments that this library cannot assume on your behalf.

It makes a few important assumptions about your 'jobPayload 'JSON, which are documented in defaultJobType.

defaultJobToText :: (Job -> Text) -> Job -> Text Source #

Used only by defaultLogStr now. TODO: Is this even required anymore? Should this be removed?

defaultJobType :: Job -> Text Source #

This makes two important assumptions. First, this assumes that jobs in your app are represented by a sum-type. For example:

data MyJob = SendWelcomeEmail Int
           | SendPasswordResetEmail Text
           | SetupSampleData Int

Second, it assumes that the JSON representatin of this sum-type is "tagged". For example, the following...

let pload = SendWelcomeEmail 10

...when converted to JSON, would look like...

{"tag":"SendWelcomeEmail", "contents":10}

It uses this assumption to extract the "job type" from a Value (which would be SendWelcomeEmail in the example given above). This is used in logging and the admin UI.

Even if tihs assumption is violated, the job-runner should continue to function. It's just that you won't get very useful log messages.

defaultTimedLogger :: TimedFastLogger -> (LogLevel -> LogEvent -> LogStr) -> LogLevel -> LogEvent -> IO () Source #

TODO: Should the library be doing this?

defaultLockTimeout :: Seconds Source #

TODO: Make this configurable for the job-runner, why is this still hard-coded?

withConnectionPool :: MonadUnliftIO m => Either ByteString ConnectInfo -> (Pool Connection -> m a) -> m a Source #

Convenience function to create a DB connection-pool with some sensible defaults. Please see the source-code of this function to understand what it's doing. TODO: link-off to tutorial.

Creating/scheduling jobs

Ideally you'd want to create wrappers for createJob and scheduleJob in your application so that instead of being in IO they can be in your application's monad m instead (this saving you from a liftIO every time you want to enqueue a job

createJob :: ToJSON p => Connection -> TableName -> p -> IO Job Source #

Create a job for immediate execution.

Internally calls scheduleJob passing it the current time. Read scheduleJob for further documentation.

scheduleJob Source #

Arguments

:: ToJSON p 
=> Connection

DB connection to use. Note: This should ideally come out of your application's DB pool, not the cfgDbPool you used in the job-runner.

-> TableName

DB-table which holds your jobs

-> p

Job payload

-> UTCTime

when should the job be executed

-> IO Job 

Create a job for execution at the given time.

  • If time has already past, jobEventListener is going to pick this up for execution immediately.
  • If time is in the future, jobPoller is going to pick this up with an error of +/- cfgPollingInterval seconds. Please do not expect very high accuracy of when the job is actually executed.

Job and associated data-types

data Job Source #

Instances
Eq Job Source # 
Instance details

Defined in OddJobs.Job

Methods

(==) :: Job -> Job -> Bool #

(/=) :: Job -> Job -> Bool #

Show Job Source # 
Instance details

Defined in OddJobs.Job

Methods

showsPrec :: Int -> Job -> ShowS #

show :: Job -> String #

showList :: [Job] -> ShowS #

FromRow Job Source # 
Instance details

Defined in OddJobs.Job

type JobId = Int Source #

data Status Source #

Constructors

Success 
Queued 
Failed 
Retry 
Locked 
Instances
Enum Status Source # 
Instance details

Defined in OddJobs.Job

Eq Status Source # 
Instance details

Defined in OddJobs.Job

Methods

(==) :: Status -> Status -> Bool #

(/=) :: Status -> Status -> Bool #

Ord Status Source # 
Instance details

Defined in OddJobs.Job

Show Status Source # 
Instance details

Defined in OddJobs.Job

Generic Status Source # 
Instance details

Defined in OddJobs.Job

Associated Types

type Rep Status :: Type -> Type #

Methods

from :: Status -> Rep Status x #

to :: Rep Status x -> Status #

ToJSON Status Source # 
Instance details

Defined in OddJobs.Web

FromJSON Status Source # 
Instance details

Defined in OddJobs.Web

FromField Status Source # 
Instance details

Defined in OddJobs.Job

ToField Status Source # 
Instance details

Defined in OddJobs.Job

Methods

toField :: Status -> Action #

ToText Status Source # 
Instance details

Defined in OddJobs.Job

Methods

toText :: Status -> Text #

StringConv Text a => FromText (Either a Status) Source # 
Instance details

Defined in OddJobs.Job

Methods

fromText :: Text -> Either a Status #

type Rep Status Source # 
Instance details

Defined in OddJobs.Job

type Rep Status = D1 (MetaData "Status" "OddJobs.Job" "odd-jobs-0.1.0-AG1ucQCmc3LHSWSLszrvJU" False) ((C1 (MetaCons "Success" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Queued" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Failed" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "Retry" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Locked" PrefixI False) (U1 :: Type -> Type))))

type TableName = Query Source #

An alias for Query type. Since this type has an instance of IsString you do not need to do anything special to create a value for this type. Just ensure you have the OverloadedStrings extention enabled. For example:

{-# LANGUAGE OverloadedStrings #-}

myJobsTable :: TableName
myJobsTable = "my_jobs"

delaySeconds :: MonadIO m => Seconds -> m () Source #

Convenience wrapper on-top of threadDelay which takes Seconds as an argument, instead of micro-seconds.

newtype Seconds Source #

Constructors

Seconds 

Fields

Instances
Eq Seconds Source # 
Instance details

Defined in OddJobs.Types

Methods

(==) :: Seconds -> Seconds -> Bool #

(/=) :: Seconds -> Seconds -> Bool #

Num Seconds Source # 
Instance details

Defined in OddJobs.Types

Ord Seconds Source # 
Instance details

Defined in OddJobs.Types

Read Seconds Source # 
Instance details

Defined in OddJobs.Types

Show Seconds Source # 
Instance details

Defined in OddJobs.Types

Structured logging

TODO: Complete the prose here

data LogEvent Source #

Constructors

LogJobStart !Job

Emitted when a job starts execution

LogJobSuccess !Job !NominalDiffTime

Emitted when a job succeeds along with the time taken for execution.

LogJobFailed !Job !NominalDiffTime

Emitted when a job fails (but will be retried) along with the time taken for this attempt

LogJobPermanentlyFailed !Job !NominalDiffTime

Emitted when a job fails permanently (and will no longer be retried) along with the time taken for this attempt (i.e. final attempt)

LogJobTimeout !Job

Emitted when a job times out and is picked-up again for execution

LogPoll

Emitted whenever jobPoller polls the DB table

LogText !Text

Emitted whenever any other event occurs

Instances
Eq LogEvent Source # 
Instance details

Defined in OddJobs.Job

Show LogEvent Source # 
Instance details

Defined in OddJobs.Job

Generic LogEvent Source # 
Instance details

Defined in OddJobs.Job

Associated Types

type Rep LogEvent :: Type -> Type #

Methods

from :: LogEvent -> Rep LogEvent x #

to :: Rep LogEvent x -> LogEvent #

type Rep LogEvent Source # 
Instance details

Defined in OddJobs.Job

Job-runner interals

jobMonitor :: forall m. HasJobRunner m => m () Source #

Spawns jobPoller and jobEventListener in separate threads and restarts them in the off-chance they happen to crash. Also responsible for implementing graceful shutdown, i.e. waiting for all jobs already being executed to finish execution before exiting the main thread.

jobEventListener :: HasJobRunner m => m () Source #

Uses PostgreSQL's LISTEN/NOTIFY to be immediately notified of newly created jobs.

jobPoller :: HasJobRunner m => m () Source #

Executes jobPollingSql every cfgPollingInterval seconds to pick up jobs for execution. Uses UPDATE along with SELECT...FOR UPDATE to efficiently find a job that matches all of the following conditions:

  • jobRunAt should be in the past
  • one of the following conditions match:

type JobRunner = Job -> IO () Source #

class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where Source #

The documentation of odd-jobs currently promotes startJobRunner, which expects a fairly detailed Config record, as a top-level function for initiating a job-runner. However, internally, this Config record is used as an enviroment for a ReaderT, and almost all functions are written in this ReaderT monad which impleents an instance of the HasJobRunner type-class.

  • *In future,** this internal implementation detail will allow us to offer a type-class based interface as well (similar to what YesodJobQueue provides).

Database helpers

JSON helpers