-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A PostgreSQL backed queue -- -- A PostgreSQL backed queue. Please see README.md @package hasql-queue @version 1.2.0.0 -- | A high throughput Session based API for a PostgreSQL backed -- queue. module Hasql.Queue.High.ExactlyOnce -- | Enqueue a payload. enqueue :: Value a -> [a] -> Session () -- | Dequeue a list of payloads dequeue :: Value a -> Int -> Session [a] -- | Internal module. Changes to this modules are not reflected in the -- package version. module Hasql.Queue.Internal -- | A Payload can exist in three states in the queue, -- Enqueued, and Dequeued. A Payload starts in the -- Enqueued state and is locked so some sort of process can occur -- with it, usually something in IO. Once the processing is -- complete, the Payload is moved the Dequeued state, -- which is the terminal state. data State Enqueued :: State Failed :: State state :: Params a -> Result b -> ByteString -> Statement a b stateDecoder :: Value State stateEncoder :: Value State initialPayloadId :: PayloadId -- | Internal payload id. Used by the public api as continuation token for -- pagination. newtype PayloadId PayloadId :: Int64 -> PayloadId [unPayloadId] :: PayloadId -> Int64 -- | The fundemental record stored in the queue. The queue is a single -- table and each row consists of a Payload data Payload a Payload :: PayloadId -> State -> Int -> Int -> a -> Payload a [pId] :: Payload a -> PayloadId [pState] :: Payload a -> State [pAttempts] :: Payload a -> Int [pModifiedAt] :: Payload a -> Int [pValue] :: Payload a -> a -- | Payload decoder payloadDecoder :: Value a -> Row (Payload a) payloadIdEncoder :: Value PayloadId payloadIdDecoder :: Value PayloadId payloadIdRow :: Row PayloadId enqueuePayload :: Value a -> [a] -> Session [PayloadId] dequeuePayload :: Value a -> Int -> Session [Payload a] -- | Get the Payload given a PayloadId getPayload :: Value a -> PayloadId -> Session (Maybe (Payload a)) -- | Get the number of rows in the Enqueued state. getCount :: Session Int64 incrementAttempts :: Int -> [PayloadId] -> Session () newtype QueryException QueryException :: QueryError -> QueryException runThrow :: Session a -> Connection -> IO a execute :: Connection -> ByteString -> IO () notifyPayload :: ByteString -> Connection -> IO () -- | To aid in observability and white box testing data WithNotifyHandlers WithNotifyHandlers :: IO () -> IO () -> WithNotifyHandlers -- | An event that is trigger after the initial action, e.g. before dequeue -- is called. [withNotifyHandlersAfterAction] :: WithNotifyHandlers -> IO () -- | An event that is triggered before the blocking on a notification. [withNotifyHandlersBeforeNotification] :: WithNotifyHandlers -> IO () data NoRows NoRows :: NoRows withNotifyWith :: WithNotifyHandlers -> Text -> Connection -> Session a -> IO a fst3 :: (a, b, c) -> a snd3 :: (a, b, c) -> b trd3 :: (a, b, c) -> c listState :: State -> Value a -> Maybe PayloadId -> Int -> Session [(PayloadId, a)] -- | Retrieve the payloads that have entered a failed state. See -- withDequeue for how that occurs. The function returns a list of -- values and an id. The id is used the starting place for the next batch -- of values. If Nothing is passed the list starts at the -- beginning. failures :: Value a -> Maybe PayloadId -> Int -> Session [(PayloadId, a)] withDequeue :: Value a -> Int -> Int -> ([a] -> IO b) -> Session (Maybe b) delete :: [PayloadId] -> Session () instance GHC.Classes.Eq Hasql.Queue.Internal.NoRows instance GHC.Show.Show Hasql.Queue.Internal.NoRows instance GHC.Show.Show Hasql.Queue.Internal.QueryException instance GHC.Classes.Eq Hasql.Queue.Internal.QueryException instance GHC.Classes.Eq a => GHC.Classes.Eq (Hasql.Queue.Internal.Payload a) instance GHC.Show.Show a => GHC.Show.Show (Hasql.Queue.Internal.Payload a) instance GHC.Show.Show Hasql.Queue.Internal.PayloadId instance GHC.Classes.Eq Hasql.Queue.Internal.PayloadId instance GHC.Enum.Bounded Hasql.Queue.Internal.State instance GHC.Enum.Enum Hasql.Queue.Internal.State instance GHC.Classes.Ord Hasql.Queue.Internal.State instance GHC.Classes.Eq Hasql.Queue.Internal.State instance GHC.Show.Show Hasql.Queue.Internal.State instance GHC.Exception.Type.Exception Hasql.Queue.Internal.NoRows instance GHC.Base.Semigroup Hasql.Queue.Internal.WithNotifyHandlers instance GHC.Base.Monoid Hasql.Queue.Internal.WithNotifyHandlers instance GHC.Exception.Type.Exception Hasql.Queue.Internal.QueryException module Hasql.Queue.High.AtMostOnce -- | Enqueue a payload. enqueue :: Connection -> Value a -> [a] -> IO () -- | Dequeue a list of payloads. dequeue :: Connection -> Value a -> Int -> IO [a] module Hasql.Queue.High.AtLeastOnce -- | Enqueue a list of payloads. enqueue :: Connection -> Value a -> [a] -> IO () -- | Wait for the next payload and process it. If the continuation throws -- an exception the payloads are put back in the queue. IOError is -- caught and withDequeue will retry up to the retry count. If -- withDequeue fails after too many retries the final exception is -- rethrown. If individual payloads are are attempted more than the retry -- count they are set as "failed". See failures to receive the -- list of failed payloads. -- -- If the queue is empty withDequeue return Nothing. If -- there are any entries withDequeue will wrap the list in -- Just. withDequeue :: Connection -> Value a -> Int -> Int -> ([a] -> IO b) -> IO (Maybe b) -- | Retrieve the payloads that have entered a failed state. See -- withDequeue for how that occurs. The function returns a list of -- values and an id. The id is used the starting place for the next batch -- of values. If Nothing is passed the list starts at the -- beginning. failures :: Connection -> Value a -> Maybe PayloadId -> Int -> IO [(PayloadId, a)] -- | Permantently remove a failed payload. delete :: Connection -> [PayloadId] -> IO () -- | A more general configurable version of withDequeue. Unlike -- withDequeue one can specify the exception that causes a retry. -- Additionally event handlers can be specified to observe the internal -- behavior of the retry loop. withDequeueWith :: forall e a b. Exception e => Connection -> Value a -> Int -> Int -> ([a] -> IO b) -> IO (Maybe b) module Hasql.Queue.Low.ExactlyOnce -- | Enqueue a payload send a notification on the specified channel. enqueue :: Text -> Value a -> [a] -> Session () withDequeue :: Text -> Connection -> Value a -> Int -> (Session [a] -> Session b) -> IO b withDequeueWith :: WithNotifyHandlers -> Text -> Connection -> Value a -> Int -> (Session [a] -> Session b) -> IO b module Hasql.Queue.Low.AtMostOnce -- | Enqueue a payload. enqueue :: Text -> Connection -> Value a -> [a] -> IO () -- | Dequeue a list of payloads. dequeue :: Text -> Connection -> Value a -> Int -> IO [a] -- | A high throughput Session based API for a PostgreSQL backed -- queue. module Hasql.Queue.Low.AtLeastOnce -- | Enqueue a list of payloads. enqueue :: Text -> Connection -> Value a -> [a] -> IO () -- | Wait for the next payload and process it. If the continuation throws -- an exception the payloads are put back in the queue. IOError is -- caught and withDequeue will retry up to the retry count. If -- withDequeue fails after too many retries the final exception is -- rethrown. If individual payloads are are attempted more than the retry -- count they are set as "failed". See failures to receive the -- list of failed payloads. -- -- If the queue is empty withDequeue will block until it recieves -- a notification from the PostgreSQL server. withDequeue :: Text -> Connection -> Value a -> Int -> Int -> ([a] -> IO b) -> IO b -- | Internal payload id. Used by the public api as continuation token for -- pagination. data PayloadId -- | Retrieve the payloads that have entered a failed state. See -- withDequeue for how that occurs. The function returns a list of -- values and an id. The id is used the starting place for the next batch -- of values. If Nothing is passed the list starts at the -- beginning. failures :: Connection -> Value a -> Maybe PayloadId -> Int -> IO [(PayloadId, a)] -- | Permantently remove a failed payload. delete :: Connection -> [PayloadId] -> IO () -- | A more general configurable version of withDequeue. Unlike -- withDequeue one can specify the exception that causes a retry. -- Additionally event handlers can be specified to observe the internal -- behavior of the retry loop. withDequeueWith :: forall e a b. Exception e => WithNotifyHandlers -> Text -> Connection -> Value a -> Int -> Int -> ([a] -> IO b) -> IO b -- | To aid in observability and white box testing data WithNotifyHandlers WithNotifyHandlers :: IO () -> IO () -> WithNotifyHandlers -- | An event that is trigger after the initial action, e.g. before dequeue -- is called. [withNotifyHandlersAfterAction] :: WithNotifyHandlers -> IO () -- | An event that is triggered before the blocking on a notification. [withNotifyHandlersBeforeNotification] :: WithNotifyHandlers -> IO () -- | Functions for migrating the database to create the necessary functions -- for the package. -- -- Users can use these functions or copy and paste the tables to create -- these tables through a standalone migration system. module Hasql.Queue.Migrate -- | The DDL statements to create the schema given a value type. migrationQueryString :: String -> String -- | This function creates a table and enumeration type that is appriopiate -- for the queue. The following sql is used. -- --
-- DO $$
-- CREATE OR REPLACE FUNCTION notify_on(channel text) RETURNs VOID AS $$
-- BEGIN
-- EXECUTE (format(E'NOTIFY %I', channel));
-- END;
-- $$ LANGUAGE plpgsql;
--
-- CREATE OR REPLACE FUNCTION listen_on(channel text) RETURNS VOID AS $$
-- BEGIN
-- EXECUTE (format(E'LISTEN %I', channel));
-- END;
-- $$ LANGUAGE plpgsql;
--
-- CREATE OR REPLACE FUNCTION unlisten_on(channel text) RETURNS VOID AS $$
-- BEGIN
-- EXECUTE (format(E'UNLISTEN %I', channel));
-- END;
-- $$ LANGUAGE plpgsql;
--
-- DO $$
-- BEGIN
-- IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = state_t) THEN
-- CREATE TYPE state_t AS ENUM (enqueued, failed);
-- END IF;
-- END$$;
--
-- CREATE SEQUENCE IF NOT EXISTS modified_index START 1;
--
-- CREATE TABLE IF NOT EXISTS payloads
-- ( id BIGSERIAL PRIMARY KEY
-- , attempts int NOT NULL DEFAULT 0
-- , state state_t NOT NULL DEFAULT enqueued
-- , modified_at int8 NOT NULL DEFAULT nextval(modified_index)
-- , value ${VALUE_TYPE} NOT NULL
-- );
--
-- CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at, state)
-- WHERE (state = enqueued);
--
--
-- The VALUE_TYPE needs to passed in through the second
-- argument.
migrate :: Connection -> String -> IO ()
-- | Drop everything created by migrate
teardown :: Connection -> IO ()