| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
OddJobs.Job
Contents
Synopsis
- startJobRunner :: Config -> IO ()
- data Config = Config {
- cfgTableName :: TableName
- cfgJobRunner :: Job -> IO ()
- cfgDefaultMaxAttempts :: Int
- cfgConcurrencyControl :: ConcurrencyControl
- cfgDbPool :: Pool Connection
- cfgPollingInterval :: Seconds
- cfgOnJobSuccess :: Job -> IO ()
- cfgOnJobFailed :: Job -> IO ()
- cfgOnJobPermanentlyFailed :: Job -> IO ()
- cfgOnJobStart :: Job -> IO ()
- cfgOnJobTimeout :: Job -> IO ()
- cfgPidFile :: Maybe FilePath
- cfgLogger :: LogLevel -> LogEvent -> IO ()
- cfgJobToText :: Job -> Text
- cfgJobType :: Job -> Text
- data ConcurrencyControl
- defaultConfig :: (LogLevel -> LogEvent -> IO ()) -> TableName -> Pool Connection -> ConcurrencyControl -> (Job -> IO ()) -> Config
- defaultJobToText :: (Job -> Text) -> Job -> Text
- defaultJobType :: Job -> Text
- defaultTimedLogger :: TimedFastLogger -> (LogLevel -> LogEvent -> LogStr) -> LogLevel -> LogEvent -> IO ()
- defaultLogStr :: (Job -> Text) -> LogLevel -> LogEvent -> LogStr
- defaultPollingInterval :: Seconds
- defaultLockTimeout :: Seconds
- withConnectionPool :: MonadUnliftIO m => Either ByteString ConnectInfo -> (Pool Connection -> m a) -> m a
- createJob :: ToJSON p => Connection -> TableName -> p -> IO Job
- scheduleJob :: ToJSON p => Connection -> TableName -> p -> UTCTime -> IO Job
- data Job = Job {
- jobId :: JobId
- jobCreatedAt :: UTCTime
- jobUpdatedAt :: UTCTime
- jobRunAt :: UTCTime
- jobStatus :: Status
- jobPayload :: Value
- jobLastError :: Maybe Value
- jobAttempts :: Int
- jobLockedAt :: Maybe UTCTime
- jobLockedBy :: Maybe Text
- type JobId = Int
- data Status
- type TableName = Query
- delaySeconds :: MonadIO m => Seconds -> m ()
- newtype Seconds = Seconds {}
- data LogEvent
- data LogLevel
- jobMonitor :: forall m. HasJobRunner m => m ()
- jobEventListener :: HasJobRunner m => m ()
- jobPoller :: HasJobRunner m => m ()
- jobPollingSql :: TableName -> Query
- type JobRunner = Job -> IO ()
- class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
- getPollingInterval :: m Seconds
- onJobSuccess :: Job -> m ()
- onJobFailed :: Job -> m ()
- onJobPermanentlyFailed :: Job -> m ()
- getJobRunner :: m (Job -> IO ())
- getDbPool :: m (Pool Connection)
- getTableName :: m TableName
- onJobStart :: Job -> m ()
- getDefaultMaxAttempts :: m Int
- onJobTimeout :: Job -> m ()
- getRunnerEnv :: m RunnerEnv
- getConcurrencyControl :: m ConcurrencyControl
- getPidFile :: m (Maybe FilePath)
- getJobToText :: m (Job -> Text)
- log :: LogLevel -> LogEvent -> m ()
- findJobById :: HasJobRunner m => JobId -> m (Maybe Job)
- findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- saveJob :: HasJobRunner m => Job -> m Job
- saveJobIO :: Connection -> TableName -> Job -> IO Job
- jobDbColumns :: (IsString s, Semigroup s) => [s]
- concatJobDbColumns :: (IsString s, Semigroup s) => s
- eitherParsePayload :: FromJSON a => Job -> Either String a
- throwParsePayload :: FromJSON a => Job -> IO a
- eitherParsePayloadWith :: (Value -> Parser a) -> Job -> Either String a
- throwParsePayloadWith :: (Value -> Parser a) -> Job -> IO a
Starting the job-runner
startJobRunner :: Config -> IO () Source #
Configuring the job-runner
While odd-jobs is highly configurable and the Config data-type might seem
daunting at first, it is not necessary to tweak every single configuration
parameter by hand. Please start-off by using the sensible defaults provided
by the configuration helpers, and tweaking
config parameters on a case-by-case basis.
Constructors
| Config | |
Fields
| |
data ConcurrencyControl Source #
Constructors
| MaxConcurrentJobs Int | The maximum number of concurrent jobs that this instance of the job-runner can execute. TODO: Link-off to tutorial. |
| UnlimitedConcurrentJobs | Not recommended: Please do not use this in production unless you know what you're doing. No machine can support unlimited concurrency. If your jobs are doing anything worthwhile, running a sufficiently large number concurrently is going to max-out some resource of the underlying machine, such as, CPU, memory, disk IOPS, or network bandwidth. |
| DynamicConcurrency (IO Bool) | Use this to dynamically determine if the next job should be picked-up, or not. This is useful to write custom-logic to determine whether a limited resource is below a certain usage threshold (eg. CPU usage is below 80%). Caveat: This feature has not been tested in production, yet. TODO: Link-off to tutorial. |
Instances
| Show ConcurrencyControl Source # | |
Defined in OddJobs.Job Methods showsPrec :: Int -> ConcurrencyControl -> ShowS # show :: ConcurrencyControl -> String # showList :: [ConcurrencyControl] -> ShowS # | |
Configuration helpers
Arguments
| :: (LogLevel -> LogEvent -> IO ()) | "Structured logging" function. Ref: |
| -> TableName | DB table which holds your jobs. Ref: |
| -> Pool Connection | DB connection-pool to be used by job-runner. Ref: |
| -> ConcurrencyControl | Concurrency configuration. Ref: |
| -> (Job -> IO ()) | The actual "job runner" which contains your application code. Ref: |
| -> Config |
This function gives you a Config with a bunch of sensible defaults
already applied. It requies the bare minimum arguments that this library
cannot assume on your behalf.
It makes a few important assumptions about your 'jobPayload 'JSON, which
are documented in defaultJobType.
defaultJobToText :: (Job -> Text) -> Job -> Text Source #
Used only by defaultLogStr now. TODO: Is this even required anymore?
Should this be removed?
defaultJobType :: Job -> Text Source #
This makes two important assumptions. First, this assumes that jobs in your app are represented by a sum-type. For example:
data MyJob = SendWelcomeEmail Int
| SendPasswordResetEmail Text
| SetupSampleData Int
Second, it assumes that the JSON representatin of this sum-type is "tagged". For example, the following...
let pload = SendWelcomeEmail 10
...when converted to JSON, would look like...
{"tag":"SendWelcomeEmail", "contents":10}It uses this assumption to extract the "job type" from a Value
(which would be SendWelcomeEmail in the example given above). This is used
in logging and the admin UI.
Even if tihs assumption is violated, the job-runner should continue to function. It's just that you won't get very useful log messages.
defaultTimedLogger :: TimedFastLogger -> (LogLevel -> LogEvent -> LogStr) -> LogLevel -> LogEvent -> IO () Source #
TODO: Should the library be doing this?
defaultPollingInterval :: Seconds Source #
As the name says. Ref: cfgPollingInterval
defaultLockTimeout :: Seconds Source #
TODO: Make this configurable for the job-runner, why is this still hard-coded?
withConnectionPool :: MonadUnliftIO m => Either ByteString ConnectInfo -> (Pool Connection -> m a) -> m a Source #
Convenience function to create a DB connection-pool with some sensible defaults. Please see the source-code of this function to understand what it's doing. TODO: link-off to tutorial.
Creating/scheduling jobs
Ideally you'd want to create wrappers for createJob and scheduleJob in
your application so that instead of being in IO they can be in your
application's monad m instead (this saving you from a liftIO every time
you want to enqueue a job
createJob :: ToJSON p => Connection -> TableName -> p -> IO Job Source #
Create a job for immediate execution.
Internally calls scheduleJob passing it the current time. Read
scheduleJob for further documentation.
Arguments
| :: ToJSON p | |
| => Connection | DB connection to use. Note: This should
ideally come out of your application's DB pool,
not the |
| -> TableName | DB-table which holds your jobs |
| -> p | Job payload |
| -> UTCTime | when should the job be executed |
| -> IO Job |
Create a job for execution at the given time.
- If time has already past,
jobEventListeneris going to pick this up for execution immediately. - If time is in the future,
jobPolleris going to pick this up with an error of +/-cfgPollingIntervalseconds. Please do not expect very high accuracy of when the job is actually executed.
Job and associated data-types
Constructors
| Job | |
Fields
| |
Instances
| Enum Status Source # | |
Defined in OddJobs.Job | |
| Eq Status Source # | |
| Ord Status Source # | |
| Show Status Source # | |
| Generic Status Source # | |
| ToJSON Status Source # | |
Defined in OddJobs.Web | |
| FromJSON Status Source # | |
| FromField Status Source # | |
Defined in OddJobs.Job Methods | |
| ToField Status Source # | |
Defined in OddJobs.Job | |
| ToText Status Source # | |
Defined in OddJobs.Job | |
| StringConv Text a => FromText (Either a Status) Source # | |
| type Rep Status Source # | |
Defined in OddJobs.Job type Rep Status = D1 (MetaData "Status" "OddJobs.Job" "odd-jobs-0.1.0-AG1ucQCmc3LHSWSLszrvJU" False) ((C1 (MetaCons "Success" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Queued" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Failed" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "Retry" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Locked" PrefixI False) (U1 :: Type -> Type)))) | |
type TableName = Query Source #
An alias for Query type. Since this type has an instance of IsString
you do not need to do anything special to create a value for this type. Just
ensure you have the OverloadedStrings extention enabled. For example:
{-# LANGUAGE OverloadedStrings #-}
myJobsTable :: TableName
myJobsTable = "my_jobs"
delaySeconds :: MonadIO m => Seconds -> m () Source #
Convenience wrapper on-top of threadDelay which takes Seconds as an
argument, instead of micro-seconds.
Structured logging
TODO: Complete the prose here
Constructors
| LogJobStart !Job | Emitted when a job starts execution |
| LogJobSuccess !Job !NominalDiffTime | Emitted when a job succeeds along with the time taken for execution. |
| LogJobFailed !Job !NominalDiffTime | Emitted when a job fails (but will be retried) along with the time taken for this attempt |
| LogJobPermanentlyFailed !Job !NominalDiffTime | Emitted when a job fails permanently (and will no longer be retried) along with the time taken for this attempt (i.e. final attempt) |
| LogJobTimeout !Job | Emitted when a job times out and is picked-up again for execution |
| LogPoll | Emitted whenever |
| LogText !Text | Emitted whenever any other event occurs |
Instances
Constructors
| LevelDebug | |
| LevelInfo | |
| LevelWarn | |
| LevelError | |
| LevelOther Text |
Job-runner interals
jobMonitor :: forall m. HasJobRunner m => m () Source #
Spawns jobPoller and jobEventListener in separate threads and restarts
them in the off-chance they happen to crash. Also responsible for
implementing graceful shutdown, i.e. waiting for all jobs already being
executed to finish execution before exiting the main thread.
jobEventListener :: HasJobRunner m => m () Source #
Uses PostgreSQL's LISTEN/NOTIFY to be immediately notified of newly created jobs.
jobPoller :: HasJobRunner m => m () Source #
Executes jobPollingSql every cfgPollingInterval seconds to pick up jobs
for execution. Uses UPDATE along with SELECT...FOR UPDATE to efficiently
find a job that matches all of the following conditions:
jobRunAtshould be in the pastone of the following conditions match:
jobStatusshould beQueuedorRetryjobStatusshould beLockedandjobLockedAtshould bedefaultLockTimeoutseconds in the past, thus indicating that the job was picked up execution, but didn't complete on time (possible because the thread/process executing it crashed without being able to update the DB)
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where Source #
The documentation of odd-jobs currently promotes startJobRunner, which
expects a fairly detailed Config record, as a top-level function for
initiating a job-runner. However, internally, this Config record is used as
an enviroment for a ReaderT, and almost all functions are written in this
ReaderT monad which impleents an instance of the HasJobRunner type-class.
- *In future,** this internal implementation detail will allow us to offer a
type-class based interface as well (similar to what
YesodJobQueueprovides).
Methods
getPollingInterval :: m Seconds Source #
onJobSuccess :: Job -> m () Source #
onJobFailed :: Job -> m () Source #
onJobPermanentlyFailed :: Job -> m () Source #
getJobRunner :: m (Job -> IO ()) Source #
getDbPool :: m (Pool Connection) Source #
getTableName :: m TableName Source #
onJobStart :: Job -> m () Source #
getDefaultMaxAttempts :: m Int Source #
onJobTimeout :: Job -> m () Source #
getRunnerEnv :: m RunnerEnv Source #
getConcurrencyControl :: m ConcurrencyControl Source #
getPidFile :: m (Maybe FilePath) Source #
getJobToText :: m (Job -> Text) Source #
Database helpers
findJobById :: HasJobRunner m => JobId -> m (Maybe Job) Source #
findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
jobDbColumns :: (IsString s, Semigroup s) => [s] Source #
concatJobDbColumns :: (IsString s, Semigroup s) => s Source #