consumers-1.1: Concurrent PostgreSQL data consumers

Safe HaskellNone




data Action Source

Action to take after a job was processed.

data Result Source

Result of processing a job.


Ok Action 
Failed Action 

data ConsumerConfig m idx job Source

Config of a consumer.


forall row . FromRow row => ConsumerConfig 


ccJobsTable :: !(RawSQL ())

Name of a database table where jobs are stored. The table needs to have the following columns in order to be suitable for acting as a job queue:

  • id - represents ID of the job. Needs to be a primary key of the type convertible to text, not nullable.
  • run_at - represents the time at which the job will be processed. Needs to be nullable, of the type comparable with now() (TIMESTAMPTZ is recommended). Note: a job with run_at set to NULL is never picked for processing. Useful for storing already processed/expired jobs for debugging purposes.
  • finished_at - represents the time at which job processing was finished. Needs to be nullable, of the type you can assign now() to (TIMESTAMPTZ is recommended). NULL means that the job was either never processed or that it was started and failed at least once.
  • reserved_by - represents ID of the consumer that currently processes the job. Needs to be nullable, of the type corresponding to id in the table ccConsumersTable. It's recommended (though not neccessary) to make it a foreign key referencing id in ccConsumersTable with ON DELETE SET NULL.
  • attempts - represents number of job processing attempts made so far. Needs to be not nullable, of type INTEGER. Initial value of a fresh job should be 0, therefore it makes sense to make the column default to 0.
ccConsumersTable :: !(RawSQL ())

Name of a database table where registered consumers are stored. The table itself needs to have the following columns:

  • id - represents ID of a consumer. Needs to be a primary key of the type SERIAL or BIGSERIAL (recommended).
  • name - represents jobs table of the consumer. Needs to be not nullable, of type TEXT. Allows for tracking consumers of multiple queues with one table. Set to ccJobsTable.
  • last_activity - represents the last registered activity of the consumer. It's updated periodically by all currently running consumers every 30 seconds to prove that they are indeed running. They also check for the registered consumers that didn't update their status for a minute. If any such consumers are found, they are presumed to be not working and all the jobs reserved by them are released. This prevents the situation where a consumer with reserved jobs silently fails (e.g. because of a hard crash) and these jobs stay locked forever, yet are never processed.
ccJobSelectors :: ![SQL]

Fields needed to be selected from the jobs table in order to assemble a job.

ccJobFetcher :: !(row -> job)

Function that transforms the list of fields into a job.

ccJobIndex :: !(job -> idx)

Selector for taking out job ID from the job object.

ccNotificationChannel :: !(Maybe Channel)

Notification channel used for listening for incoming jobs. If set to Nothing, no listening is performed and jobs are selected from the database every ccNotificationTimeout microseconds.

ccNotificationTimeout :: !Int

Timeout of listening for incoming jobs, in microseconds. The consumer checks between the timeouts if there are any jobs in the database that needs to be processed, so even if ccNotificationChannel is Just, you need to set it to a reasonable number if the jobs you process may fail and are retried later, as there is no way to signal with a notification that a job will need to be performed e.g. in 5 minutes. However, if ccNotificationChannel is Just and jobs are never retried, you can set it to -1, then listening will never timeout. Otherwise it needs to be a positive number.

ccMaxRunningJobs :: !Int

Maximum amount of jobs that can be processed in parallel.

ccProcessJob :: !(job -> m Result)

Function that processes a job.

ccOnException :: !(SomeException -> job -> m Action)

Action taken if job processing function throws an exception. Note that if this action throws an exception, the consumer goes down, so it's best to ensure that it doesn't throw.