postgresql-simple-queue-1.0.1: A PostgreSQL backed queue

Safe HaskellNone
LanguageHaskell2010

Database.PostgreSQL.Simple.Queue

Contents

Description

This module utilize PostgreSQL to implement a durable queue for efficently processing arbitrary payloads which can be represented as JSON.

Typically a producer would enqueue a new payload as part of larger database transaction

  createAccount userRecord = do
     runDBTSerializable $ do
        createUserDB userRecord
        enqueueDB $ makeVerificationEmail userRecord

In another thread or process, the consumer would drain the queue.

   forever $ do
     -- Attempt get a payload or block until one is available
     payload <- lock conn

     -- Perform application specifc parsing of the payload value
     case fromJSON $ pValue payload of
       Success x -> sendEmail x -- Perform application specific processing
       Error err -> logErr err

     -- Remove the payload from future processing
     dequeue conn $ pId payload

For a more complete example or a consumer, utilizing the provided defaultMain, see EmailQueue.

This modules provides two flavors of functions, a DB API and an IO API. Most operations are provided in both flavors, with the exception of lock. lock blocks and would not be that useful as part of a larger transaction since it would keep the transaction open for a potentially long time. Although both flavors are provided, in general one versions is more useful for typical use cases.

Synopsis

Types

data State Source #

A Payload can exist in three states in the queue, Enqueued, and Dequeued. A Payload starts in the Enqueued state and is locked so some sort of process can occur with it, usually something in IO. Once the processing is complete, the Payload is moved the Dequeued state, which is the terminal state.

Constructors

Enqueued 
Dequeued 

data Payload Source #

Constructors

Payload 

Fields

DB API

enqueueDB :: String -> Value -> DB PayloadId Source #

Enqueue a new JSON value into the queue. This particularly function can be composed as part of a larger database transaction. For instance, a single transaction could create a user and enqueue a email message.

  createAccount userRecord = do
     runDBTSerializable $ do
        createUserDB userRecord
        enqueueDB $ makeVerificationEmail userRecord

dequeueDB :: String -> DB (Maybe Payload) Source #

Transition a Payload to the Dequeued state.

withPayloadDB Source #

Arguments

:: String

schema

-> Int

retry count

-> (Payload -> IO a)

payload processing function

-> DB (Either SomeException (Maybe a)) 

Attempt to get a payload and process it. If the function passed in throws an exception return it on the left side of the Either. Re-add the payload up to some passed in maximum. Return Nothing is the payloads table is empty otherwise the result is an a from the payload ingesting function.

getCountDB :: String -> DB Int64 Source #

Get the number of rows in the Enqueued state.

IO API

enqueue :: String -> Connection -> Value -> IO PayloadId Source #

Enqueue a new JSON value into the queue. See enqueueDB for a version which can be composed with other queries in a single transaction.

tryDequeue :: String -> Connection -> IO (Maybe Payload) Source #

Return a the oldest Payload in the Enqueued state or Nothing if there are no payloads. For a blocking version utilizing PostgreSQL's NOTIFY and LISTEN, see dequeue. This functions runs dequeueDb as a ReadCommitted transaction.

See withPayload for an alternative interface that will automatically return the payload to the Enqueued state if an exception occurs.

dequeue :: String -> Connection -> IO Payload Source #

Transition a Payload to the Dequeued state. his functions runs dequeueDB as a Serializable transaction.

withPayload Source #

Arguments

:: String 
-> Connection 
-> Int

retry count

-> (Payload -> IO a) 
-> IO (Either SomeException a) 

Return the oldest Payload in the Enqueued state or block until a payload arrives. This function utilizes PostgreSQL's LISTEN and NOTIFY functionality to avoid excessively polling of the DB while waiting for new payloads, without scarficing promptness.

getCount :: String -> Connection -> IO Int64 Source #

Get the number of rows in the Enqueued state. This function runs getCountDB in a ReadCommitted transaction.