-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A PostgreSQL backed queue -- -- This module utilize PostgreSQL to implement a durable queue for -- efficently processing arbitrary payloads which can be represented as -- JSON. -- -- Typically a producer would enqueue a new payload as part of larger -- database transaction -- --
--   createAccount userRecord = do
--      'runDBTSerializable' $ do
--         createUserDB userRecord
--         'enqueueDB' "queue_schema" $ makeVerificationEmail userRecord
--   
-- -- In another thread or process, the consumer would drain the queue. -- --
--     forever $ do
--       -- Attempt get a payload or block until one is available
--       payload <- lock "queue" conn
--   
--       -- Perform application specifc parsing of the payload value
--       case fromJSON $ pValue payload of
--         Success x -> sendEmail x -- Perform application specific processing
--         Error err -> logErr err
--   
--       -- Remove the payload from future processing
--       dequeue "queue" conn $ pId payload
--   
--   To support multiple queues in the same database, the API expects a table name string
--   to determine which queue tables to use.
--   
@package postgresql-simple-queue @version 1.0.1 -- | This module utilize PostgreSQL to implement a durable queue for -- efficently processing arbitrary payloads which can be represented as -- JSON. -- -- Typically a producer would enqueue a new payload as part of larger -- database transaction -- --
--   createAccount userRecord = do
--      runDBTSerializable $ do
--         createUserDB userRecord
--         enqueueDB $ makeVerificationEmail userRecord
--   
-- -- In another thread or process, the consumer would drain the queue. -- --
--   forever $ do
--     -- Attempt get a payload or block until one is available
--     payload <- lock conn
--   
--     -- Perform application specifc parsing of the payload value
--     case fromJSON $ pValue payload of
--       Success x -> sendEmail x -- Perform application specific processing
--       Error err -> logErr err
--   
--     -- Remove the payload from future processing
--     dequeue conn $ pId payload
--   
-- -- For a more complete example or a consumer, utilizing the provided -- defaultMain, see EmailQueue. -- -- This modules provides two flavors of functions, a DB API and an IO -- API. Most operations are provided in both flavors, with the exception -- of lock. lock blocks and would not be that useful as -- part of a larger transaction since it would keep the transaction open -- for a potentially long time. Although both flavors are provided, in -- general one versions is more useful for typical use cases. module Database.PostgreSQL.Simple.Queue newtype PayloadId PayloadId :: Int64 -> PayloadId [unPayloadId] :: PayloadId -> Int64 -- | A Payload can exist in three states in the queue, -- Enqueued, and Dequeued. A Payload starts in the -- Enqueued state and is locked so some sort of process can occur -- with it, usually something in IO. Once the processing is -- complete, the Payload is moved the Dequeued state, which -- is the terminal state. data State Enqueued :: State Dequeued :: State data Payload Payload :: PayloadId -> Value -> State -> Int -> UTCTime -> UTCTime -> Payload [pId] :: Payload -> PayloadId -- | The JSON value of a payload [pValue] :: Payload -> Value [pState] :: Payload -> State [pAttempts] :: Payload -> Int [pCreatedAt] :: Payload -> UTCTime [pModifiedAt] :: Payload -> UTCTime -- | Enqueue a new JSON value into the queue. This particularly function -- can be composed as part of a larger database transaction. For -- instance, a single transaction could create a user and enqueue a email -- message. -- --
--   createAccount userRecord = do
--      runDBTSerializable $ do
--         createUserDB userRecord
--         enqueueDB $ makeVerificationEmail userRecord
--   
enqueueDB :: String -> Value -> DB PayloadId -- | Transition a Payload to the Dequeued state. dequeueDB :: String -> DB (Maybe Payload) -- | Attempt to get a payload and process it. If the function passed in -- throws an exception return it on the left side of the Either. -- Re-add the payload up to some passed in maximum. Return Nothing -- is the payloads table is empty otherwise the result is an -- a from the payload ingesting function. withPayloadDB :: String -> Int -> (Payload -> IO a) -> DB (Either SomeException (Maybe a)) -- | Get the number of rows in the Enqueued state. getCountDB :: String -> DB Int64 -- | Enqueue a new JSON value into the queue. See enqueueDB for a -- version which can be composed with other queries in a single -- transaction. enqueue :: String -> Connection -> Value -> IO PayloadId -- | Return a the oldest Payload in the Enqueued state or -- Nothing if there are no payloads. For a blocking version -- utilizing PostgreSQL's NOTIFY and LISTEN, see dequeue. This -- functions runs dequeueDb as a ReadCommitted -- transaction. -- -- See withPayload for an alternative interface that will -- automatically return the payload to the Enqueued state if an -- exception occurs. tryDequeue :: String -> Connection -> IO (Maybe Payload) -- | Transition a Payload to the Dequeued state. his -- functions runs dequeueDB as a Serializable transaction. dequeue :: String -> Connection -> IO Payload -- | Return the oldest Payload in the Enqueued state or block -- until a payload arrives. This function utilizes PostgreSQL's LISTEN -- and NOTIFY functionality to avoid excessively polling of the DB while -- waiting for new payloads, without scarficing promptness. withPayload :: String -> Connection -> Int -> (Payload -> IO a) -> IO (Either SomeException a) -- | Get the number of rows in the Enqueued state. This function -- runs getCountDB in a ReadCommitted transaction. getCount :: String -> Connection -> IO Int64 instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.Payload instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.Payload instance GHC.Enum.Bounded Database.PostgreSQL.Simple.Queue.State instance GHC.Enum.Enum Database.PostgreSQL.Simple.Queue.State instance GHC.Classes.Ord Database.PostgreSQL.Simple.Queue.State instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.State instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.State instance Database.PostgreSQL.Simple.ToField.ToField Database.PostgreSQL.Simple.Queue.PayloadId instance Database.PostgreSQL.Simple.FromField.FromField Database.PostgreSQL.Simple.Queue.PayloadId instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.PayloadId instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.PayloadId instance Database.PostgreSQL.Simple.FromRow.FromRow Database.PostgreSQL.Simple.Queue.Payload instance Database.PostgreSQL.Simple.ToField.ToField Database.PostgreSQL.Simple.Queue.State instance Database.PostgreSQL.Simple.FromField.FromField Database.PostgreSQL.Simple.Queue.State instance Database.PostgreSQL.Simple.FromRow.FromRow Database.PostgreSQL.Simple.Queue.PayloadId instance Database.PostgreSQL.Simple.ToRow.ToRow Database.PostgreSQL.Simple.Queue.PayloadId module Database.PostgreSQL.Simple.Queue.Migrate -- | This function creates a table and enumeration type that is appriopiate -- for the queue. The following sql is used. -- --
--   CREATE TYPE state_t AS ENUM (enqueued, locked, dequeued);
--   
--   CREATE TABLE payloads
--   ( id uuid PRIMARY KEY
--   , value jsonb NOT NULL
--   , state state_t NOT NULL DEFAULT enqueued
--   , created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
--   , modified_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
--   );
--   
--   CREATE INDEX state_idx ON payloads (state);
--   
--   CREATE OR REPLACE FUNCTION update_row_modified_function_()
--   RETURNS TRIGGER
--   AS
--   $$
--   BEGIN
--       -- ASSUMES the table has a column named exactly "modified_at".
--       -- Fetch date-time of actual current moment from clock,
--       -- rather than start of statement or start of transaction.
--       NEW.modified_at = clock_timestamp();
--       RETURN NEW;
--   END;
--   $$
--   language plpgsql;
--   
migrate :: String -> Connection -> IO ()