Safe Haskell | None |
---|---|
Language | Haskell2010 |
This module utilize PostgreSQL to implement a durable queue for efficently processing arbitrary payloads which can be represented as JSON.
Typically a producer would enqueue a new payload as part of larger database transaction
createAccount userRecord = dorunDBTSerializable
$ do createUserDB userRecordenqueueDB
$ makeVerificationEmail userRecord
In another thread or process, the consumer would drain the queue.
forever $ do -- Attempt get a payload or block until one is available payload <-lock
conn -- Perform application specifc parsing of the payload value case fromJSON $pValue
payload of Success x -> sendEmail x -- Perform application specific processing Error err -> logErr err -- Remove the payload from future processingdequeue
conn $pId
payload
For a more complete example or a consumer, utilizing the provided
defaultMain
, see
EmailQueue
.
This modules provides two flavors of functions, a DB API and an IO API.
Most operations are provided in both flavors, with the exception of lock
.
lock
blocks and would not be that useful as part of a larger transaction
since it would keep the transaction open for a potentially long time. Although
both flavors are provided, in general one versions is more useful for typical
use cases.
- newtype PayloadId = PayloadId {
- unPayloadId :: Int64
- data State
- data Payload = Payload {
- pId :: PayloadId
- pValue :: Value
- pState :: State
- pCreatedAt :: UTCTime
- pModifiedAt :: UTCTime
- enqueueDB :: String -> Value -> DB PayloadId
- tryLockDB :: String -> DB (Maybe Payload)
- unlockDB :: String -> PayloadId -> DB ()
- dequeueDB :: String -> PayloadId -> DB ()
- getCountDB :: String -> DB Int64
- enqueue :: String -> Connection -> Value -> IO PayloadId
- tryLock :: String -> Connection -> IO (Maybe Payload)
- lock :: String -> Connection -> IO Payload
- unlock :: String -> Connection -> PayloadId -> IO ()
- dequeue :: String -> Connection -> PayloadId -> IO ()
- getCount :: String -> Connection -> IO Int64
Types
A Payload
can exist in three states in the queue, Enqueued
, Locked
and Dequeued
. A Payload
starts in the Enqueued
state and is Locked
so some sort of process can occur with it, usually something in IO
.
Once the processing is complete, the Payload
is moved the Dequeued
state, which is the terminal state.
DB API
enqueueDB :: String -> Value -> DB PayloadId Source #
Enqueue a new JSON value into the queue. This particularly function can be composed as part of a larger database transaction. For instance, a single transaction could create a user and enqueue a email message.
createAccount userRecord = dorunDBTSerializable
$ do createUserDB userRecordenqueueDB
$ makeVerificationEmail userRecord
tryLockDB :: String -> DB (Maybe Payload) Source #
Return a the oldest Payload
in the Enqueued
state, or Nothing
if there are no payloads. This function is not necessarily useful by
itself, since there are not many use cases where it needs to be combined
with other transactions. See tryLock
the IO API version, or for a
blocking version utilizing PostgreSQL's NOTIFY and LISTEN, see lock
IO API
enqueue :: String -> Connection -> Value -> IO PayloadId Source #
Enqueue a new JSON value into the queue. See enqueueDB
for a version
which can be composed with other queries in a single transaction.
dequeue :: String -> Connection -> PayloadId -> IO () Source #
Transition a Payload
to the Dequeued
state. his functions runs
dequeueDB
as a Serializable
transaction.
getCount :: String -> Connection -> IO Int64 Source #
Get the number of rows in the Enqueued
state. This function runs
getCountDB
in a ReadCommitted
transaction.