Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- startJobRunner :: Config -> IO ()
- data Config = Config {
- cfgTableName :: TableName
- cfgJobRunner :: Job -> IO ()
- cfgDefaultMaxAttempts :: Int
- cfgConcurrencyControl :: ConcurrencyControl
- cfgDbPool :: Pool Connection
- cfgPollingInterval :: Seconds
- cfgOnJobSuccess :: Job -> IO ()
- cfgOnJobFailed :: forall a. [JobErrHandler a]
- cfgOnJobStart :: Job -> IO ()
- cfgOnJobTimeout :: Job -> IO ()
- cfgPidFile :: Maybe FilePath
- cfgLogger :: LogLevel -> LogEvent -> IO ()
- cfgJobType :: Job -> Text
- cfgJobTypeSql :: Query
- cfgDefaultJobTimeout :: Seconds
- cfgJobToHtml :: [Job] -> IO [Html ()]
- cfgAllJobTypes :: AllJobTypes
- data ConcurrencyControl
- createJob :: ToJSON p => Connection -> TableName -> p -> IO Job
- scheduleJob :: ToJSON p => Connection -> TableName -> p -> UTCTime -> IO Job
- data Job = Job {}
- type JobId = Int
- data Status
- newtype JobRunnerName = JobRunnerName {}
- type TableName = Query
- delaySeconds :: MonadIO m => Seconds -> m ()
- newtype Seconds = Seconds {}
- data JobErrHandler a = Exception e => JobErrHandler (e -> Job -> FailureMode -> IO a)
- data AllJobTypes
- data LogEvent
- data LogLevel
- jobMonitor :: forall m. HasJobRunner m => m ()
- jobEventListener :: HasJobRunner m => m ()
- jobPoller :: HasJobRunner m => m ()
- jobPollingSql :: TableName -> Query
- type JobRunner = Job -> IO ()
- class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
- getPollingInterval :: m Seconds
- onJobSuccess :: Job -> m ()
- onJobFailed :: forall a. m [JobErrHandler a]
- getJobRunner :: m (Job -> IO ())
- getDbPool :: m (Pool Connection)
- getTableName :: m TableName
- onJobStart :: Job -> m ()
- getDefaultMaxAttempts :: m Int
- getRunnerEnv :: m RunnerEnv
- getConcurrencyControl :: m ConcurrencyControl
- getPidFile :: m (Maybe FilePath)
- log :: LogLevel -> LogEvent -> m ()
- getDefaultJobTimeout :: m Seconds
- findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- saveJobIO :: Connection -> TableName -> Job -> IO Job
- runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- jobDbColumns :: (IsString s, Semigroup s) => [s]
- concatJobDbColumns :: (IsString s, Semigroup s) => s
- fetchAllJobTypes :: MonadIO m => Config -> m [Text]
- fetchAllJobRunners :: MonadIO m => Config -> m [JobRunnerName]
- eitherParsePayload :: FromJSON a => Job -> Either String a
- throwParsePayload :: FromJSON a => Job -> IO a
- eitherParsePayloadWith :: (Value -> Parser a) -> Job -> Either String a
- throwParsePayloadWith :: (Value -> Parser a) -> Job -> IO a
Starting the job-runner
startJobRunner :: Config -> IO () Source #
Configuring the job-runner
While odd-jobs is highly configurable and the Config
data-type might seem
daunting at first, it is not necessary to tweak every single configuration
parameter by hand.
Recommendation: Please start-off by building a Config
by using the
mkConfig
function (to get something with sensible
defaults) and then tweaking config parameters on a case-by-case basis.
Config | |
|
data ConcurrencyControl Source #
Note: Please read the section on controlling concurrency in the implementation guide to understand the implications of each option given by the data-type.
MaxConcurrentJobs Int | The maximum number of concurrent jobs that this instance of the job-runner can execute. |
UnlimitedConcurrentJobs | Not recommended: Please do not use this in production unless you know what you're doing. No machine can support unlimited concurrency. If your jobs are doing anything worthwhile, running a sufficiently large number concurrently is going to max-out some resource of the underlying machine, such as, CPU, memory, disk IOPS, or network bandwidth. |
DynamicConcurrency (IO Bool) | Use this to dynamically determine if the next job should be picked-up, or not. This is useful to write custom-logic to determine whether a limited resource is below a certain usage threshold (eg. CPU usage is below 80%). Caveat: This feature has not been tested in production, yet. |
Instances
Show ConcurrencyControl Source # | |
Defined in OddJobs.Types showsPrec :: Int -> ConcurrencyControl -> ShowS # show :: ConcurrencyControl -> String # showList :: [ConcurrencyControl] -> ShowS # |
Creating/scheduling jobs
Ideally you'd want to create wrappers for createJob
and scheduleJob
in
your application so that instead of being in IO
they can be in your
application's monad m
instead (this saving you from a liftIO
every time
you want to enqueue a job
createJob :: ToJSON p => Connection -> TableName -> p -> IO Job Source #
Create a job for immediate execution.
Internally calls scheduleJob
passing it the current time. Read
scheduleJob
for further documentation.
:: ToJSON p | |
=> Connection | DB connection to use. Note: This should
ideally come out of your application's DB pool,
not the |
-> TableName | DB-table which holds your jobs |
-> p | Job payload |
-> UTCTime | when should the job be executed |
-> IO Job |
Create a job for execution at the given time.
- If time has already past,
jobEventListener
is going to pick this up for execution immediately. - If time is in the future,
jobPoller
is going to pick this up with an error of +/-cfgPollingInterval
seconds. Please do not expect very high accuracy of when the job is actually executed.
Job
and associated data-types
Job | |
|
Instances
Success | In the current version of odd-jobs you should not find any jobs having
the |
Queued | Jobs in |
Failed | Jobs in |
Retry | Jobs in |
Locked | Jobs in |
Instances
Bounded Status Source # | |
Enum Status Source # | |
Defined in OddJobs.Types | |
Eq Status Source # | |
Ord Status Source # | |
Show Status Source # | |
Generic Status Source # | |
ToJSON Status Source # | |
Defined in OddJobs.Types | |
FromJSON Status Source # | |
FromField Status Source # | |
Defined in OddJobs.Types | |
ToField Status Source # | |
Defined in OddJobs.Types | |
ToText Status Source # | |
Defined in OddJobs.Types | |
StringConv Text a => FromText (Either a Status) Source # | |
type Rep Status Source # | |
Defined in OddJobs.Types type Rep Status = D1 (MetaData "Status" "OddJobs.Types" "odd-jobs-0.2.1-9zdA1vdP33HK1B5SZnsxYf" False) ((C1 (MetaCons "Success" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Queued" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Failed" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "Retry" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Locked" PrefixI False) (U1 :: Type -> Type)))) |
newtype JobRunnerName Source #
Instances
type TableName = Query Source #
An alias for Query
type. Since this type has an instance of IsString
you do not need to do anything special to create a value for this type. Just
ensure you have the OverloadedStrings
extention enabled. For example:
{-# LANGUAGE OverloadedStrings #-} myJobsTable :: TableName myJobsTable = "my_jobs"
delaySeconds :: MonadIO m => Seconds -> m () Source #
Convenience wrapper on-top of threadDelay
which takes Seconds
as an
argument, instead of micro-seconds.
data JobErrHandler a Source #
Exception handler for jobs. This is conceptually very similar to how
Handler
and catches
(from
Exception
) work in-tandem. Using cfgOnJobFailed
you can install
multiple exception handlers, where each handler is responsible for one type
of exception. OddJobs will execute the correct exception handler on the basis
of the type of runtime exception raised. For example:
cfgOnJobFailed = [ JobErrHandler $ (e :: HttpException) job failMode -> ... , JobErrHandler $ (e :: SqlException) job failMode -> ... , JobErrHandler $ (e :: ) job failMode -> ... ]
Note: Please refer to the section on alerts and
notifications
in the implementation guide to understand how to use the machinery provided
by JobErrHandler
and cfgOnJobFailed
.
Exception e => JobErrHandler (e -> Job -> FailureMode -> IO a) |
data AllJobTypes Source #
The web/admin UI needs to know a "master list" of all job-types to be
able to power the "filter by job-type" feature. This data-type helps in
letting odd-jobs know how to get such a master-list. The function specified
by this type is run once when the job-runner starts (and stored in an
internal IORef
). After that the list of job-types needs to be updated
manually by pressing the appropriate "refresh" link in the admin/web UI.
AJTFixed [Text] | A fixed-list of job-types. If you don't want to increase boilerplate,
consider using |
AJTSql (Connection -> IO [Text]) | Construct the list of job-types dynamically by looking at the actual
payloads in |
AJTCustom (IO [Text]) | A custom |
Structured logging
OddJobs uses "structured logging" for important events that occur during the life-cycle of a job-runner. This is useful if you're using JSON/XML for aggegating logs of your entire application to something like Kibana, AWS CloudFront, GCP StackDriver Logging, etc.
If you're not interested in using structured logging, look at
defaultLogStr
to output plain-text logs (or you can
write your own function, as well).
LogJobStart !Job | Emitted when a job starts execution |
LogJobSuccess !Job !NominalDiffTime | Emitted when a job succeeds along with the time taken for execution. |
LogJobFailed !Job !SomeException !FailureMode !NominalDiffTime | Emitted when a job fails (but will be retried) along with the time taken for this attempt |
LogJobTimeout !Job | Emitted when a job times out and is picked-up again for execution |
LogPoll | Emitted whenever |
LogWebUIRequest | TODO |
LogText !Text | Emitted whenever any other event occurs |
Instances
Job-runner interals
jobMonitor :: forall m. HasJobRunner m => m () Source #
Spawns jobPoller
and jobEventListener
in separate threads and restarts
them in the off-chance they happen to crash. Also responsible for
implementing graceful shutdown, i.e. waiting for all jobs already being
executed to finish execution before exiting the main thread.
jobEventListener :: HasJobRunner m => m () Source #
Uses PostgreSQL's LISTEN/NOTIFY to be immediately notified of newly created jobs.
jobPoller :: HasJobRunner m => m () Source #
Executes jobPollingSql
every cfgPollingInterval
seconds to pick up jobs
for execution. Uses UPDATE
along with SELECT...FOR UPDATE
to efficiently
find a job that matches all of the following conditions:
jobRunAt
should be in the pastone of the following conditions match:
jobStatus
should beQueued
orRetry
jobStatus
should beLocked
andjobLockedAt
should bedefaultLockTimeout
seconds in the past, thus indicating that the job was picked up execution, but didn't complete on time (possible because the thread/process executing it crashed without being able to update the DB)
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where Source #
The documentation of odd-jobs currently promotes startJobRunner
, which
expects a fairly detailed Config
record, as a top-level function for
initiating a job-runner. However, internally, this Config
record is used as
an enviroment for a ReaderT
, and almost all functions are written in this
ReaderT
monad which impleents an instance of the HasJobRunner
type-class.
In future, this internal implementation detail will allow us to offer a
type-class based interface as well (similar to what
YesodJobQueue
provides).
getPollingInterval :: m Seconds Source #
onJobSuccess :: Job -> m () Source #
onJobFailed :: forall a. m [JobErrHandler a] Source #
getJobRunner :: m (Job -> IO ()) Source #
getDbPool :: m (Pool Connection) Source #
getTableName :: m TableName Source #
onJobStart :: Job -> m () Source #
getDefaultMaxAttempts :: m Int Source #
getRunnerEnv :: m RunnerEnv Source #
getConcurrencyControl :: m ConcurrencyControl Source #
getPidFile :: m (Maybe FilePath) Source #
Database helpers
A bunch of functions that help you query cfgTableName
and change the status
of individual jobs. Most of these functions are in IO
and you might want
to write wrappers that lift them into you application's custom monad.
Note: When passing a Connection
to these function, it is
recommended to not take a connection from cfgDbPool
. Use your
application's database pool instead.
findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
TODO: First check in all job-runners if this job is still running, or not, and somehow send an uninterruptibleCancel to that thread.
cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
jobDbColumns :: (IsString s, Semigroup s) => [s] Source #
If you are writing SQL queries where you want to return ALL columns from
the jobs table it is recommended that you do not issue a SELECT *
or
RETURNIG *
. List out specific DB columns using jobDbColumns
and
concatJobDbColumns
instead. This will insulate you from runtime errors
caused by addition of new columns to cfgTableName
in future versions of
OddJobs.
concatJobDbColumns :: (IsString s, Semigroup s) => s Source #
All jobDbColumns
joined together with commas. Useful for constructing SQL
queries, eg:
query_
conn $ "SELECT " <> concatJobDbColumns <> "FROM jobs"
fetchAllJobTypes :: MonadIO m => Config -> m [Text] Source #
Used by the web/admin UI to fetch a "master list" of all known
job-types. Ref: cfgAllJobTypes
fetchAllJobRunners :: MonadIO m => Config -> m [JobRunnerName] Source #
Used by web/admin IO to fetch a "master list" of all known job-runners. There is a known issue with the way this has been implemented:
- Since this looks at the
jobLockedBy
column ofcfgTableName
, it will discover only those job-runners that are actively executing at least one job at the time this function is executed.