postgresql-simple-queue-0.5.1.0: A PostgreSQL backed queue

Safe HaskellNone
LanguageHaskell2010

Database.PostgreSQL.Simple.Queue

Contents

Description

This module utilize PostgreSQL to implement a durable queue for efficently processing arbitrary payloads which can be represented as JSON.

Typically a producer would enqueue a new payload as part of larger database transaction

  createAccount userRecord = do
     runDBTSerializable $ do
        createUserDB userRecord
        enqueueDB $ makeVerificationEmail userRecord

In another thread or process, the consumer would drain the queue.

   forever $ do
     -- Attempt get a payload or block until one is available
     payload <- lock conn

     -- Perform application specifc parsing of the payload value
     case fromJSON $ pValue payload of
       Success x -> sendEmail x -- Perform application specific processing
       Error err -> logErr err

     -- Remove the payload from future processing
     dequeue conn $ pId payload

For a more complete example or a consumer, utilizing the provided defaultMain, see EmailQueue.

This modules provides two flavors of functions, a DB API and an IO API. Most operations are provided in both flavors, with the exception of lock. lock blocks and would not be that useful as part of a larger transaction since it would keep the transaction open for a potentially long time. Although both flavors are provided, in general one versions is more useful for typical use cases.

Synopsis

Types

data State Source #

A Payload can exist in three states in the queue, Enqueued, Locked and Dequeued. A Payload starts in the Enqueued state and is Locked so some sort of process can occur with it, usually something in IO. Once the processing is complete, the Payload is moved the Dequeued state, which is the terminal state.

Constructors

Enqueued 
Locked 
Dequeued 

data Payload Source #

Constructors

Payload 

Fields

DB API

enqueueDB :: String -> Value -> DB PayloadId Source #

Enqueue a new JSON value into the queue. This particularly function can be composed as part of a larger database transaction. For instance, a single transaction could create a user and enqueue a email message.

  createAccount userRecord = do
     runDBTSerializable $ do
        createUserDB userRecord
        enqueueDB $ makeVerificationEmail userRecord

tryLockDB :: String -> DB (Maybe Payload) Source #

Return a the oldest Payload in the Enqueued state, or Nothing if there are no payloads. This function is not necessarily useful by itself, since there are not many use cases where it needs to be combined with other transactions. See tryLock the IO API version, or for a blocking version utilizing PostgreSQL's NOTIFY and LISTEN, see lock

unlockDB :: String -> PayloadId -> DB () Source #

Transition a Payload from the Locked state to the Enqueued state. Useful for responding to asynchronous exceptions during a unexpected shutdown. In general the IO API version, unlock, is probably more useful. The DB version is provided for completeness.

dequeueDB :: String -> PayloadId -> DB () Source #

Transition a Payload to the Dequeued state.

getCountDB :: String -> DB Int64 Source #

Get the number of rows in the Enqueued state.

IO API

enqueue :: String -> Connection -> Value -> IO PayloadId Source #

Enqueue a new JSON value into the queue. See enqueueDB for a version which can be composed with other queries in a single transaction.

tryLock :: String -> Connection -> IO (Maybe Payload) Source #

Return a the oldest Payload in the Enqueued state or Nothing if there are no payloads. For a blocking version utilizing PostgreSQL's NOTIFY and LISTEN, see lock. This functions runs tryLockDB as a Serializable transaction.

lock :: String -> Connection -> IO Payload Source #

Return the oldest Payload in the Enqueued state or block until a payload arrives. This function utilizes PostgreSQL's LISTEN and NOTIFY functionality to avoid excessively polling of the DB while waiting for new payloads, without scarficing promptness.

unlock :: String -> Connection -> PayloadId -> IO () Source #

Transition a Payload from the Locked state to the Enqueued state. Useful for responding to asynchronous exceptions during a unexpected shutdown. For a DB API version see unlockDB

dequeue :: String -> Connection -> PayloadId -> IO () Source #

Transition a Payload to the Dequeued state. his functions runs dequeueDB as a Serializable transaction.

getCount :: String -> Connection -> IO Int64 Source #

Get the number of rows in the Enqueued state. This function runs getCountDB in a ReadCommitted transaction.