-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A PostgreSQL backed queue -- -- This module utilize PostgreSQL to implement a durable queue for -- efficently processing arbitrary payloads which can be represented as -- JSON. -- -- Typically a producer would enqueue a new payload as part of larger -- database transaction -- --
--   createAccount userRecord = do
--      'runDBTSerializable' $ do
--         createUserDB userRecord
--         'enqueueDB' "queue_schema" $ makeVerificationEmail userRecord
--   
-- -- In another thread or process, the consumer would drain the queue. -- --
--     forever $ do
--       -- Attempt get a payload or block until one is available
--       payload <- lock "queue_schema" conn
--   
--       -- Perform application specifc parsing of the payload value
--       case fromJSON $ pValue payload of
--         Success x -> sendEmail x -- Perform application specific processing
--         Error err -> logErr err
--   
--       -- Remove the payload from future processing
--       dequeue "queue_schema" conn $ pId payload
--   
--   To support multiple queues in the same database, the API expects a table name string
--   to determine which queue tables to use.
--   
@package postgresql-simple-queue @version 0.3.0.0 module Database.PostgreSQL.Simple.Queue.Migrate -- | This function creates a table and enumeration type that is appriopiate -- for the queue. The following sql is used. -- --
--   CREATE TYPE state_t AS ENUM (enqueued, locked, dequeued);
--   
--   CREATE TABLE |] <> " " <> fromString tableName <> [sql|
--   ( id uuid PRIMARY KEY
--   , value jsonb NOT NULL
--   , state state_t NOT NULL DEFAULT enqueued
--   , created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
--   , modified_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
--   );
--   
--   CREATE INDEX state_idx ON|] <> " " <> fromString tableName <> " " <> [sql|(state);
--   
--   CREATE OR REPLACE FUNCTION update_row_modified_function_()
--   RETURNS TRIGGER
--   AS
--   $$
--   BEGIN
--       -- ASSUMES the table has a column named exactly "modified_at".
--       -- Fetch date-time of actual current moment from clock,
--       -- rather than start of statement or start of transaction.
--       NEW.modified_at = clock_timestamp();
--       RETURN NEW;
--   END;
--   $$
--   language plpgsql;
--   
migrate :: String -> Connection -> IO () -- | This module utilize PostgreSQL to implement a durable queue for -- efficently processing arbitrary payloads which can be represented as -- JSON. -- -- Typically a producer would enqueue a new payload as part of larger -- database transaction -- --
--   createAccount userRecord = do
--      runDBTSerializable $ do
--         createUserDB userRecord
--         enqueueDB $ makeVerificationEmail userRecord
--   
-- -- In another thread or process, the consumer would drain the queue. -- --
--   forever $ do
--     -- Attempt get a payload or block until one is available
--     payload <- lock conn
--   
--     -- Perform application specifc parsing of the payload value
--     case fromJSON $ pValue payload of
--       Success x -> sendEmail x -- Perform application specific processing
--       Error err -> logErr err
--   
--     -- Remove the payload from future processing
--     dequeue conn $ pId payload
--   
-- -- For a more complete example or a consumer, utilizing the provided -- defaultMain, see EmailQueue. -- -- This modules provides two flavors of functions, a DB API and an IO -- API. Most operations are provided in both flavors, with the exception -- of lock. lock blocks and would not be that useful as -- part of a larger transaction since it would keep the transaction open -- for a potentially long time. Although both flavors are provided, in -- general one versions is more useful for typical use cases. module Database.PostgreSQL.Simple.Queue newtype PayloadId PayloadId :: Int64 -> PayloadId [unPayloadId] :: PayloadId -> Int64 -- | A Payload can exist in three states in the queue, -- Enqueued, Locked and Dequeued. A Payload -- starts in the Enqueued state and is Locked so some sort -- of process can occur with it, usually something in IO. Once the -- processing is complete, the Payload is moved the -- Dequeued state, which is the terminal state. data State Enqueued :: State Locked :: State Dequeued :: State data Payload Payload :: PayloadId -> Value -> State -> UTCTime -> UTCTime -> Payload [pId] :: Payload -> PayloadId -- | The JSON value of a payload [pValue] :: Payload -> Value [pState] :: Payload -> State [pCreatedAt] :: Payload -> UTCTime [pModifiedAt] :: Payload -> UTCTime -- | Enqueue a new JSON value into the queue. This particularly function -- can be composed as part of a larger database transaction. For -- instance, a single transaction could create a user and enqueue a email -- message. -- --
--   createAccount userRecord = do
--      runDBTSerializable $ do
--         createUserDB userRecord
--         enqueueDB $ makeVerificationEmail userRecord
--   
enqueueDB :: String -> Value -> DB PayloadId -- | Return a the oldest Payload in the Enqueued state, or -- Nothing if there are no payloads. This function is not -- necessarily useful by itself, since there are not many use cases where -- it needs to be combined with other transactions. See tryLock -- the IO API version, or for a blocking version utilizing PostgreSQL's -- NOTIFY and LISTEN, see lock tryLockDB :: String -> DB (Maybe Payload) -- | Transition a Payload from the Locked state to the -- Enqueued state. Useful for responding to asynchronous -- exceptions during a unexpected shutdown. In general the IO API -- version, unlock, is probably more useful. The DB version is -- provided for completeness. unlockDB :: String -> PayloadId -> DB () -- | Transition a Payload to the Dequeued state. dequeueDB :: String -> PayloadId -> DB () -- | Get the number of rows in the Enqueued state. getCountDB :: String -> DB Int64 -- | Enqueue a new JSON value into the queue. See enqueueDB for a -- version which can be composed with other queries in a single -- transaction. enqueue :: String -> Connection -> Value -> IO PayloadId -- | Return a the oldest Payload in the Enqueued state or -- Nothing if there are no payloads. For a blocking version -- utilizing PostgreSQL's NOTIFY and LISTEN, see lock. This -- functions runs tryLockDB as a Serializable transaction. tryLock :: String -> Connection -> IO (Maybe Payload) -- | Return the oldest Payload in the Enqueued state or block -- until a payload arrives. This function utilizes PostgreSQL's LISTEN -- and NOTIFY functionality to avoid excessively polling of the DB while -- waiting for new payloads, without scarficing promptness. lock :: String -> Connection -> IO Payload -- | Transition a Payload from the Locked state to the -- Enqueued state. Useful for responding to asynchronous -- exceptions during a unexpected shutdown. For a DB API version see -- unlockDB unlock :: String -> Connection -> PayloadId -> IO () -- | Transition a Payload to the Dequeued state. his -- functions runs dequeueDB as a Serializable transaction. dequeue :: String -> Connection -> PayloadId -> IO () -- | Get the number of rows in the Enqueued state. This function -- runs getCountDB in a ReadCommitted transaction. getCount :: String -> Connection -> IO Int64 instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.Payload instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.Payload instance GHC.Enum.Bounded Database.PostgreSQL.Simple.Queue.State instance GHC.Enum.Enum Database.PostgreSQL.Simple.Queue.State instance GHC.Classes.Ord Database.PostgreSQL.Simple.Queue.State instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.State instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.State instance Database.PostgreSQL.Simple.ToField.ToField Database.PostgreSQL.Simple.Queue.PayloadId instance Database.PostgreSQL.Simple.FromField.FromField Database.PostgreSQL.Simple.Queue.PayloadId instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.PayloadId instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.PayloadId instance Database.PostgreSQL.Simple.FromRow.FromRow Database.PostgreSQL.Simple.Queue.PayloadId instance Database.PostgreSQL.Simple.ToRow.ToRow Database.PostgreSQL.Simple.Queue.PayloadId instance Database.PostgreSQL.Simple.ToField.ToField Database.PostgreSQL.Simple.Queue.State instance Database.PostgreSQL.Simple.FromField.FromField Database.PostgreSQL.Simple.Queue.State instance Database.PostgreSQL.Simple.FromRow.FromRow Database.PostgreSQL.Simple.Queue.Payload -- | This module provides a simple way to create executables for consuming -- queue payloads. It provides a defaultMain function which takes -- in a executable name and processing function. It returns a main -- function which will parse database options on from the command line -- and spawn a specified number of worker threads. -- -- Here is a simple example that logs out queue payloads -- --
--   defaultMain "queue-logger" $ payload count -> do
--     print payload
--     print count
--   
-- -- The worker threads do not attempt to handle exceptions. If an -- exception is thrown on any threads, all threads are cancel and -- defaultMain returns. The assumption is the queue executable -- will get run by a process watcher that can log failures. -- -- For a more complicated example, see the code for the -- async-email-example documented in EmailQueue. module Database.PostgreSQL.Simple.Queue.Main -- | The PartialOptions provide a Monoid for combining -- options used by defaultMain. The following command line -- arguments are used to construct a PartialOptions -- --
--   --thread-count INT
--   
--   Either a connection string
--   --connectString ARG
--   or individual db properties are provided
--   --host STRING
--   --port INT
--   --user STRING
--   --password STRING
--   --database STRING
--   
data PartialOptions -- | Final Options used by run. data Options -- | Convert a PartialOptions to a final Options completeOptions :: PartialOptions -> Either [String] Options -- | This function is a helper for creating queue consumer executables. It -- takes in a executable name and processing function. It returns a main -- function which will parse database options on from the command line -- and spawn a specified number of worker threads. See -- PartialOptions for command line documentation. -- -- Here is a simple example that logs out queue payloads -- --
--   defaultMain "queue-logger" $ payload count -> do
--     print payload
--     print count
--   
-- -- The worker threads do not attempt to handle exceptions. If an -- exception is thrown on any threads, all threads are cancel and -- defaultMain returns. The assumption is the queue executable -- will get run by a process watcher that can log failures. -- -- For a more complicated example, see the code for the -- async-email-example documented in EmailQueue. defaultMain :: (MonadIO m, MonadBaseControl IO m) => Text -> (Payload -> Int64 -> m ()) -> m () -- | run is called by defaultMain after command line argument -- parsing is performed. Useful is wants to embed consumer threads inside -- a larger application. run :: forall m. (MonadIO m, MonadBaseControl IO m) => (Payload -> Int64 -> m ()) -> Options -> m () instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.Main.Options instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.Main.Options instance GHC.Classes.Eq Database.PostgreSQL.Simple.Queue.Main.PartialOptions instance GHC.Show.Show Database.PostgreSQL.Simple.Queue.Main.PartialOptions instance GHC.Base.Monoid Database.PostgreSQL.Simple.Queue.Main.PartialOptions instance Data.Default.Class.Default Database.PostgreSQL.Simple.Queue.Main.PartialOptions instance Options.Generic.ParseRecord Database.PostgreSQL.Simple.Queue.Main.PartialOptions -- | This is a documentation only module that exposes the source for the -- async-email-example. The main file is located in examples/EmailQueue -- and is reprinted here to show an example of using defaultMain -- from Main to quickly build a queue consumer executable. -- --
--   import           Control.Exception.Lifted
--   import           Control.Lens
--   import           Control.Monad
--   import           Control.Monad.IO.Class
--   import           Data.Aeson as Aeson
--   import           Data.Text
--   import           Database.PostgreSQL.Simple.Queue
--   import           Database.PostgreSQL.Simple.Queue.Main
--   import           GHC.Generics
--   import           Network.AWS               as AWS
--   import           Network.AWS.SES.SendEmail as AWS
--   import           Network.AWS.SES.Types     as AWS
--   
--   main :: IO ()
--   main = do
--     env <- newEnv Discover
--     runResourceT $ runAWS env $ defaultMain "aws-email-queue-consumer" $ payload _ -> do
--       case fromJSON $ pValue payload of
--         Aeson.Success email -> do
--           resp <- AWS.send $ makeEmail email
--           logFailedRequest resp
--         Aeson.Error x -> throwIO
--                        $ userError
--                        $ "Failed to decode payload as an Email: " ++ show x
--   
--   data Email = Email
--     { emailAddress :: Text
--     , emailSubject :: Text
--     , emailBody    :: Text
--     } deriving (Show, Eq, Generic, FromJSON, ToJSON)
--   
--   makeEmail :: Email -> SendEmail
--   makeEmail Email {..}
--     = sendEmail emailAddress
--                 (set dToAddresses [emailAddress] destination)
--     $ message (content emailSubject)
--     $ set bText (Just $ content emailBody) AWS.body
--   
--   logFailedRequest :: MonadIO m => SendEmailResponse -> m ()
--   logFailedRequest resp = do
--       let stat = view sersResponseStatus resp
--   
--       unless (stat >= 200 && stat < 300) $
--         liftIO $ putStrLn $ "SES failed with status: " ++ show stat
--   
module Database.PostgreSQL.Simple.Queue.Examples.EmailQueue