| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Database.PostgreSQL.Simple.Queue
Description
This module utilize PostgreSQL to implement a durable queue for efficently processing arbitrary payloads which can be represented as JSON.
Typically a producer would enqueue a new payload as part of larger database transaction
createAccount userRecord = do
runDBTSerializable $ do
createUserDB userRecord
enqueueDB $ makeVerificationEmail userRecord
In another thread or process, the consumer would drain the queue.
forever $ do
-- Attempt get a payload or block until one is available
payload <- lock conn
-- Perform application specifc parsing of the payload value
case fromJSON $ pValue payload of
Success x -> sendEmail x -- Perform application specific processing
Error err -> logErr err
-- Remove the payload from future processing
dequeue conn $ pId payload
For a more complete example or a consumer, utilizing the provided
defaultMain, see
EmailQueue.
This modules provides two flavors of functions, a DB API and an IO API.
Most operations are provided in both flavors, with the exception of lock.
lock blocks and would not be that useful as part of a larger transaction
since it would keep the transaction open for a potentially long time. Although
both flavors are provided, in general one versions is more useful for typical
use cases.
- newtype PayloadId = PayloadId {
- unPayloadId :: Int64
- data State
- data Payload = Payload {}
- enqueueDB :: String -> Value -> DB PayloadId
- dequeueDB :: String -> DB (Maybe Payload)
- withPayloadDB :: String -> Int -> (Payload -> IO a) -> DB (Either SomeException (Maybe a))
- getCountDB :: String -> DB Int64
- enqueue :: String -> Connection -> Value -> IO PayloadId
- tryDequeue :: String -> Connection -> IO (Maybe Payload)
- dequeue :: String -> Connection -> IO Payload
- withPayload :: String -> Connection -> Int -> (Payload -> IO a) -> IO (Either SomeException a)
- getCount :: String -> Connection -> IO Int64
Types
Constructors
| PayloadId | |
Fields
| |
Constructors
| Payload | |
DB API
enqueueDB :: String -> Value -> DB PayloadId Source #
Enqueue a new JSON value into the queue. This particularly function can be composed as part of a larger database transaction. For instance, a single transaction could create a user and enqueue a email message.
createAccount userRecord = do
runDBTSerializable $ do
createUserDB userRecord
enqueueDB $ makeVerificationEmail userRecord
IO API
enqueue :: String -> Connection -> Value -> IO PayloadId Source #
Enqueue a new JSON value into the queue. See enqueueDB for a version
which can be composed with other queries in a single transaction.
tryDequeue :: String -> Connection -> IO (Maybe Payload) Source #
Return a the oldest Payload in the Enqueued state or Nothing
if there are no payloads. For a blocking version utilizing PostgreSQL's
NOTIFY and LISTEN, see dequeue. This functions runs dequeueDb as a
ReadCommitted transaction.
See withPayload for an alternative interface that will automatically return
the payload to the Enqueued state if an exception occurs.
dequeue :: String -> Connection -> IO Payload Source #
Transition a Payload to the Dequeued state. his functions runs
dequeueDB as a Serializable transaction.
Arguments
| :: String | |
| -> Connection | |
| -> Int | retry count |
| -> (Payload -> IO a) | |
| -> IO (Either SomeException a) |
getCount :: String -> Connection -> IO Int64 Source #
Get the number of rows in the Enqueued state. This function runs
getCountDB in a ReadCommitted transaction.