-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Concurrent PostgreSQL data consumers
--
-- Library for setting up concurrent consumers of data stored inside
-- PostgreSQL database in a simple, declarative manner.
@package consumers
@version 2.3.2.0
module Database.PostgreSQL.Consumers.Config
-- | Action to take after a job was processed.
data Action
MarkProcessed :: Action
RerunAfter :: Interval -> Action
RerunAt :: UTCTime -> Action
Remove :: Action
-- | Result of processing a job.
data Result
Ok :: Action -> Result
Failed :: Action -> Result
-- | Config of a consumer.
data ConsumerConfig m idx job
ConsumerConfig :: !RawSQL () -> !RawSQL () -> ![SQL] -> !row -> job -> !job -> idx -> !Maybe Channel -> !Int -> !Int -> !job -> m Result -> !SomeException -> job -> m Action -> ConsumerConfig m idx job
-- | Name of the database table where jobs are stored. The table needs to
-- have the following columns in order to be suitable for acting as a job
-- queue:
--
--
-- - id - represents ID of the job. Needs to be a primary key of
-- a type convertible to text, not nullable.
-- - run_at - represents the time at which the job will be
-- processed. Needs to be nullable, of a type comparable with
-- now() (TIMESTAMPTZ is recommended).
--
--
-- Note: a job with run_at set to NULL is never picked for processing.
-- Useful for storing already processed/expired jobs for debugging
-- purposes.
--
-- It's highly recommended to have an index on this column.
--
--
-- - finished_at - represents the time at which job processing
-- was finished. Needs to be nullable, of a type you can assign
-- now() to (TIMESTAMPTZ is recommended). NULL means that the
-- job was either never processed or that it was started and failed at
-- least once.
-- - reserved_by - represents ID of the consumer that currently
-- processes the job. Needs to be nullable, of the type corresponding to
-- id in the table ccConsumersTable. It's recommended (though not
-- neccessary) to make it a foreign key referencing id in
-- ccConsumersTable with ON DELETE SET NULL.
-- - attempts - represents number of job processing attempts
-- made so far. Needs to be not nullable, of type INTEGER. Initial value
-- of a fresh job should be 0, therefore it makes sense to make the
-- column default to 0.
--
[ccJobsTable] :: ConsumerConfig m idx job -> !RawSQL ()
-- | Name of a database table where registered consumers are stored. The
-- table itself needs to have the following columns:
--
--
-- - id - represents ID of a consumer. Needs to be a primary key
-- of the type SERIAL or BIGSERIAL (recommended).
-- - name - represents jobs table of the consumer. Needs to be
-- not nullable, of type TEXT. Allows for tracking consumers of multiple
-- queues with one table. Set to ccJobsTable.
-- - last_activity - represents the last registered activity of
-- the consumer. It's updated periodically by all currently running
-- consumers every 30 seconds to prove that they are indeed running. They
-- also check for the registered consumers that didn't update their
-- status for a minute. If any such consumers are found, they are
-- presumed to be not working and all the jobs reserved by them are
-- released. This prevents the situation where a consumer with reserved
-- jobs silently fails (e.g. because of a hard crash) and these jobs stay
-- locked forever, yet are never processed.
--
[ccConsumersTable] :: ConsumerConfig m idx job -> !RawSQL ()
-- | Fields needed to be selected from the jobs table in order to assemble
-- a job.
[ccJobSelectors] :: ConsumerConfig m idx job -> ![SQL]
-- | Function that transforms the list of fields into a job.
[ccJobFetcher] :: ConsumerConfig m idx job -> !row -> job
-- | Selector for taking out job ID from the job object.
[ccJobIndex] :: ConsumerConfig m idx job -> !job -> idx
-- | Notification channel used for listening for incoming jobs. Whenever
-- the consumer receives a notification, it checks the database for any
-- pending jobs ('run_at <= NOW()') and runs them all. If set
-- to Nothing, no listening is performed and
-- ccNotificationTimeout should be set to a positive number,
-- otherwise no jobs would be ever run. ccNotificationChannel and
-- ccNotificationTimeout can be combined. The consumer will check
-- for pending jobs either when notification is received or no
-- notification is received for ccNotificationTimeout microseconds
-- since the last check.
[ccNotificationChannel] :: ConsumerConfig m idx job -> !Maybe Channel
-- | Timeout of checking for any pending jobs ('run_at <=
-- NOW()'), in microseconds. The consumer checks the database for
-- any pending jobs after ccNotificationTimeout microseconds since
-- the last check was performed, runs them until all pending jobs are
-- processed and after that, the cycle repeats. Note that even if
-- ccNotificationChannel is Just, you need to set
-- ccNotificationTimeout to a reasonable number if the jobs you
-- process may fail and are retried later, as there is no way to signal
-- with a notification that a job will need to be performed e.g. in 5
-- minutes. However, if ccNotificationChannel is Just and
-- jobs are never retried, you can set it to -1, then listening will
-- never timeout. Otherwise it needs to be a positive number.
[ccNotificationTimeout] :: ConsumerConfig m idx job -> !Int
-- | Maximum amount of jobs that can be processed in parallel.
[ccMaxRunningJobs] :: ConsumerConfig m idx job -> !Int
-- | Function that processes a job. It's recommended to process each job in
-- a separate DB transaction, otherwise you'll have to remember to commit
-- your changes to the database manually.
[ccProcessJob] :: ConsumerConfig m idx job -> !job -> m Result
-- | Action taken if a job processing function throws an exception. For
-- robustness it's best to ensure that it doesn't throw. If it does, the
-- exception will be logged and the job in question postponed by a day.
[ccOnException] :: ConsumerConfig m idx job -> !SomeException -> job -> m Action
instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Action
instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Action
instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Action
instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Result
instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Result
instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Result
module Database.PostgreSQL.Consumers.Utils
-- | Run an action m that returns a finalizer and perform the
-- returned finalizer after the action action completes.
finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a
-- | Exception thrown from a child thread.
data ThrownFrom
ThrownFrom :: String -> SomeException -> ThrownFrom
-- | Stop execution of a thread.
stopExecution :: MonadBase IO m => ThreadId -> m ()
-- | Modified version of fork that propagates thrown exceptions to
-- the parent thread.
forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId
-- | Modified version of fork that propagates thrown exceptions to
-- the parent thread.
gforkP :: MonadBaseControl IO m => ThreadGroup -> String -> m () -> m (ThreadId, m (Result ()))
preparedSqlName :: Text -> RawSQL () -> QueryName
instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.StopExecution
instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.ThrownFrom
instance GHC.Exception.Type.Exception Database.PostgreSQL.Consumers.Utils.ThrownFrom
instance GHC.Exception.Type.Exception Database.PostgreSQL.Consumers.Utils.StopExecution
module Database.PostgreSQL.Consumers.Consumer
-- | ID of a consumer.
data ConsumerID
-- | Register consumer in the consumers table, so that it can reserve jobs
-- using acquired ID.
registerConsumer :: (MonadBase IO m, MonadMask m, MonadTime m) => ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
-- | Unregister consumer with a given ID.
unregisterConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> ConsumerID -> m ()
instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance Database.PostgreSQL.PQTypes.Format.PQFormat Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance Database.PostgreSQL.PQTypes.FromSQL.FromSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance Database.PostgreSQL.PQTypes.ToSQL.ToSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance GHC.Show.Show Database.PostgreSQL.Consumers.Consumer.ConsumerID
module Database.PostgreSQL.Consumers.Components
-- | Run the consumer. The purpose of the returned monadic action is to
-- wait for currently processed jobs and clean up. This function is best
-- used in conjunction with finalize to seamlessly handle the
-- finalization.
runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
runConsumerWithIdleSignal :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> TMVar Bool -> m (m ())
-- | Spawn a thread that generates signals for the dispatcher to probe the
-- database for incoming jobs.
spawnListener :: (MonadBaseControl IO m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m -> MVar () -> m ThreadId
-- | Spawn a thread that monitors working consumers for activity and
-- periodically updates its own.
spawnMonitor :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID -> m ThreadId
-- | Spawn a thread that reserves and processes jobs.
spawnDispatcher :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Show idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID -> MVar () -> TVar (Map ThreadId idx) -> TVar Int -> Maybe (TMVar Bool) -> m ThreadId
module Database.PostgreSQL.Consumers
-- | Run the consumer. The purpose of the returned monadic action is to
-- wait for currently processed jobs and clean up. This function is best
-- used in conjunction with finalize to seamlessly handle the
-- finalization.
runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
runConsumerWithIdleSignal :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> TMVar Bool -> m (m ())