| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
OddJobs.Job
Contents
Synopsis
- startJobRunner :: Config -> IO ()
- data Config = Config {
- cfgTableName :: TableName
- cfgJobRunner :: Job -> IO ()
- cfgDefaultMaxAttempts :: Int
- cfgConcurrencyControl :: ConcurrencyControl
- cfgDbPool :: Pool Connection
- cfgPollingInterval :: Seconds
- cfgOnJobSuccess :: Job -> IO ()
- cfgOnJobFailed :: forall a. [JobErrHandler a]
- cfgOnJobStart :: Job -> IO ()
- cfgOnJobTimeout :: Job -> IO ()
- cfgPidFile :: Maybe FilePath
- cfgLogger :: LogLevel -> LogEvent -> IO ()
- cfgJobType :: Job -> Text
- cfgJobTypeSql :: Query
- cfgDefaultJobTimeout :: Seconds
- cfgJobToHtml :: [Job] -> IO [Html ()]
- cfgAllJobTypes :: AllJobTypes
- data ConcurrencyControl
- createJob :: ToJSON p => Connection -> TableName -> p -> IO Job
- scheduleJob :: ToJSON p => Connection -> TableName -> p -> UTCTime -> IO Job
- data Job = Job {}
- type JobId = Int
- data Status
- newtype JobRunnerName = JobRunnerName {}
- type TableName = Query
- delaySeconds :: MonadIO m => Seconds -> m ()
- newtype Seconds = Seconds {}
- data JobErrHandler a = Exception e => JobErrHandler (e -> Job -> FailureMode -> IO a)
- data AllJobTypes
- data LogEvent
- data LogLevel
- jobMonitor :: forall m. HasJobRunner m => m ()
- jobEventListener :: HasJobRunner m => m ()
- jobPoller :: HasJobRunner m => m ()
- jobPollingSql :: TableName -> Query
- type JobRunner = Job -> IO ()
- class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
- getPollingInterval :: m Seconds
- onJobSuccess :: Job -> m ()
- onJobFailed :: forall a. m [JobErrHandler a]
- getJobRunner :: m (Job -> IO ())
- getDbPool :: m (Pool Connection)
- getTableName :: m TableName
- onJobStart :: Job -> m ()
- getDefaultMaxAttempts :: m Int
- getRunnerEnv :: m RunnerEnv
- getConcurrencyControl :: m ConcurrencyControl
- getPidFile :: m (Maybe FilePath)
- log :: LogLevel -> LogEvent -> m ()
- getDefaultJobTimeout :: m Seconds
- findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- saveJobIO :: Connection -> TableName -> Job -> IO Job
- runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- jobDbColumns :: (IsString s, Semigroup s) => [s]
- concatJobDbColumns :: (IsString s, Semigroup s) => s
- fetchAllJobTypes :: MonadIO m => Config -> m [Text]
- fetchAllJobRunners :: MonadIO m => Config -> m [JobRunnerName]
- eitherParsePayload :: FromJSON a => Job -> Either String a
- throwParsePayload :: FromJSON a => Job -> IO a
- eitherParsePayloadWith :: (Value -> Parser a) -> Job -> Either String a
- throwParsePayloadWith :: (Value -> Parser a) -> Job -> IO a
Starting the job-runner
startJobRunner :: Config -> IO () Source #
Configuring the job-runner
While odd-jobs is highly configurable and the Config data-type might seem
daunting at first, it is not necessary to tweak every single configuration
parameter by hand.
Recommendation: Please start-off by building a Config by using the
mkConfig function (to get something with sensible
defaults) and then tweaking config parameters on a case-by-case basis.
Constructors
| Config | |
Fields
| |
data ConcurrencyControl Source #
Note: Please read the section on controlling concurrency in the implementation guide to understand the implications of each option given by the data-type.
Constructors
| MaxConcurrentJobs Int | The maximum number of concurrent jobs that this instance of the job-runner can execute. |
| UnlimitedConcurrentJobs | Not recommended: Please do not use this in production unless you know what you're doing. No machine can support unlimited concurrency. If your jobs are doing anything worthwhile, running a sufficiently large number concurrently is going to max-out some resource of the underlying machine, such as, CPU, memory, disk IOPS, or network bandwidth. |
| DynamicConcurrency (IO Bool) | Use this to dynamically determine if the next job should be picked-up, or not. This is useful to write custom-logic to determine whether a limited resource is below a certain usage threshold (eg. CPU usage is below 80%). Caveat: This feature has not been tested in production, yet. |
Instances
| Show ConcurrencyControl Source # | |
Defined in OddJobs.Types Methods showsPrec :: Int -> ConcurrencyControl -> ShowS # show :: ConcurrencyControl -> String # showList :: [ConcurrencyControl] -> ShowS # | |
Creating/scheduling jobs
Ideally you'd want to create wrappers for createJob and scheduleJob in
your application so that instead of being in IO they can be in your
application's monad m instead (this saving you from a liftIO every time
you want to enqueue a job
createJob :: ToJSON p => Connection -> TableName -> p -> IO Job Source #
Create a job for immediate execution.
Internally calls scheduleJob passing it the current time. Read
scheduleJob for further documentation.
Arguments
| :: ToJSON p | |
| => Connection | DB connection to use. Note: This should
ideally come out of your application's DB pool,
not the |
| -> TableName | DB-table which holds your jobs |
| -> p | Job payload |
| -> UTCTime | when should the job be executed |
| -> IO Job |
Create a job for execution at the given time.
- If time has already past,
jobEventListeneris going to pick this up for execution immediately. - If time is in the future,
jobPolleris going to pick this up with an error of +/-cfgPollingIntervalseconds. Please do not expect very high accuracy of when the job is actually executed.
Job and associated data-types
Constructors
| Job | |
Fields
| |
Instances
Constructors
| Success | In the current version of odd-jobs you should not find any jobs having
the |
| Queued | Jobs in |
| Failed | Jobs in |
| Retry | Jobs in |
| Locked | Jobs in |
Instances
| Bounded Status Source # | |
| Enum Status Source # | |
Defined in OddJobs.Types | |
| Eq Status Source # | |
| Ord Status Source # | |
| Show Status Source # | |
| Generic Status Source # | |
| ToJSON Status Source # | |
Defined in OddJobs.Types | |
| FromJSON Status Source # | |
| FromField Status Source # | |
Defined in OddJobs.Types Methods | |
| ToField Status Source # | |
Defined in OddJobs.Types | |
| ToText Status Source # | |
Defined in OddJobs.Types | |
| StringConv Text a => FromText (Either a Status) Source # | |
| type Rep Status Source # | |
Defined in OddJobs.Types type Rep Status = D1 (MetaData "Status" "OddJobs.Types" "odd-jobs-0.2.2-IQT5Y8dLVtd1UwbOhFLeU6" False) ((C1 (MetaCons "Success" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Queued" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Failed" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "Retry" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Locked" PrefixI False) (U1 :: Type -> Type)))) | |
newtype JobRunnerName Source #
Constructors
| JobRunnerName | |
Fields | |
Instances
type TableName = Query Source #
An alias for Query type. Since this type has an instance of IsString
you do not need to do anything special to create a value for this type. Just
ensure you have the OverloadedStrings extention enabled. For example:
{-# LANGUAGE OverloadedStrings #-}
myJobsTable :: TableName
myJobsTable = "my_jobs"
delaySeconds :: MonadIO m => Seconds -> m () Source #
Convenience wrapper on-top of threadDelay which takes Seconds as an
argument, instead of micro-seconds.
data JobErrHandler a Source #
Exception handler for jobs. This is conceptually very similar to how
Handler and catches (from
Exception) work in-tandem. Using cfgOnJobFailed you can install
multiple exception handlers, where each handler is responsible for one type
of exception. OddJobs will execute the correct exception handler on the basis
of the type of runtime exception raised. For example:
cfgOnJobFailed = [ JobErrHandler $ (e :: HttpException) job failMode -> ... , JobErrHandler $ (e :: SqlException) job failMode -> ... , JobErrHandler $ (e :: ) job failMode -> ... ]
Note: Please refer to the section on alerts and
notifications
in the implementation guide to understand how to use the machinery provided
by JobErrHandler and cfgOnJobFailed.
Constructors
| Exception e => JobErrHandler (e -> Job -> FailureMode -> IO a) |
data AllJobTypes Source #
The web/admin UI needs to know a "master list" of all job-types to be
able to power the "filter by job-type" feature. This data-type helps in
letting odd-jobs know how to get such a master-list. The function specified
by this type is run once when the job-runner starts (and stored in an
internal IORef). After that the list of job-types needs to be updated
manually by pressing the appropriate "refresh" link in the admin/web UI.
Constructors
| AJTFixed [Text] | A fixed-list of job-types. If you don't want to increase boilerplate,
consider using |
| AJTSql (Connection -> IO [Text]) | Construct the list of job-types dynamically by looking at the actual
payloads in |
| AJTCustom (IO [Text]) | A custom |
Structured logging
OddJobs uses "structured logging" for important events that occur during the life-cycle of a job-runner. This is useful if you're using JSON/XML for aggegating logs of your entire application to something like Kibana, AWS CloudFront, GCP StackDriver Logging, etc.
If you're not interested in using structured logging, look at
defaultLogStr to output plain-text logs (or you can
write your own function, as well).
Constructors
| LogJobStart !Job | Emitted when a job starts execution |
| LogJobSuccess !Job !NominalDiffTime | Emitted when a job succeeds along with the time taken for execution. |
| LogJobFailed !Job !SomeException !FailureMode !NominalDiffTime | Emitted when a job fails (but will be retried) along with the time taken for this attempt |
| LogJobTimeout !Job | Emitted when a job times out and is picked-up again for execution |
| LogPoll | Emitted whenever |
| LogWebUIRequest | TODO |
| LogText !Text | Emitted whenever any other event occurs |
Instances
Constructors
| LevelDebug | |
| LevelInfo | |
| LevelWarn | |
| LevelError | |
| LevelOther Text |
Job-runner interals
jobMonitor :: forall m. HasJobRunner m => m () Source #
Spawns jobPoller and jobEventListener in separate threads and restarts
them in the off-chance they happen to crash. Also responsible for
implementing graceful shutdown, i.e. waiting for all jobs already being
executed to finish execution before exiting the main thread.
jobEventListener :: HasJobRunner m => m () Source #
Uses PostgreSQL's LISTEN/NOTIFY to be immediately notified of newly created jobs.
jobPoller :: HasJobRunner m => m () Source #
Executes jobPollingSql every cfgPollingInterval seconds to pick up jobs
for execution. Uses UPDATE along with SELECT...FOR UPDATE to efficiently
find a job that matches all of the following conditions:
jobRunAtshould be in the pastone of the following conditions match:
jobStatusshould beQueuedorRetryjobStatusshould beLockedandjobLockedAtshould bedefaultLockTimeoutseconds in the past, thus indicating that the job was picked up execution, but didn't complete on time (possible because the thread/process executing it crashed without being able to update the DB)
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where Source #
The documentation of odd-jobs currently promotes startJobRunner, which
expects a fairly detailed Config record, as a top-level function for
initiating a job-runner. However, internally, this Config record is used as
an enviroment for a ReaderT, and almost all functions are written in this
ReaderT monad which impleents an instance of the HasJobRunner type-class.
In future, this internal implementation detail will allow us to offer a
type-class based interface as well (similar to what
YesodJobQueue provides).
Methods
getPollingInterval :: m Seconds Source #
onJobSuccess :: Job -> m () Source #
onJobFailed :: forall a. m [JobErrHandler a] Source #
getJobRunner :: m (Job -> IO ()) Source #
getDbPool :: m (Pool Connection) Source #
getTableName :: m TableName Source #
onJobStart :: Job -> m () Source #
getDefaultMaxAttempts :: m Int Source #
getRunnerEnv :: m RunnerEnv Source #
getConcurrencyControl :: m ConcurrencyControl Source #
getPidFile :: m (Maybe FilePath) Source #
Database helpers
A bunch of functions that help you query cfgTableName and change the status
of individual jobs. Most of these functions are in IO and you might want
to write wrappers that lift them into you application's custom monad.
Note: When passing a Connection to these function, it is
recommended to not take a connection from cfgDbPool. Use your
application's database pool instead.
findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
TODO: First check in all job-runners if this job is still running, or not, and somehow send an uninterruptibleCancel to that thread.
cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
jobDbColumns :: (IsString s, Semigroup s) => [s] Source #
If you are writing SQL queries where you want to return ALL columns from
the jobs table it is recommended that you do not issue a SELECT * or
RETURNIG *. List out specific DB columns using jobDbColumns and
concatJobDbColumns instead. This will insulate you from runtime errors
caused by addition of new columns to cfgTableName in future versions of
OddJobs.
concatJobDbColumns :: (IsString s, Semigroup s) => s Source #
All jobDbColumns joined together with commas. Useful for constructing SQL
queries, eg:
query_ conn $ "SELECT " <> concatJobDbColumns <> "FROM jobs"fetchAllJobTypes :: MonadIO m => Config -> m [Text] Source #
Used by the web/admin UI to fetch a "master list" of all known
job-types. Ref: cfgAllJobTypes
fetchAllJobRunners :: MonadIO m => Config -> m [JobRunnerName] Source #
Used by web/admin IO to fetch a "master list" of all known job-runners. There is a known issue with the way this has been implemented:
- Since this looks at the
jobLockedBycolumn ofcfgTableName, it will discover only those job-runners that are actively executing at least one job at the time this function is executed.