-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Concurrent PostgreSQL data consumers
--
-- Library for setting up concurrent consumers of data stored inside
-- PostgreSQL database in a simple, declarative manner.
@package consumers
@version 2.0
module Database.PostgreSQL.Consumers.Utils
-- | Runs an action that returns a finalizer and performs it at the end.
finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a
-- | Exception thrown from a child thread.
data ThrownFrom
ThrownFrom :: String -> SomeException -> ThrownFrom
-- | Stop execution of a thread.
stopExecution :: MonadBase IO m => ThreadId -> m ()
-- | Modified version of fork that propagates thrown exceptions to
-- the parent thread.
forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId
-- | Modified version of fork that propagates thrown exceptions to
-- the parent thread.
gforkP :: MonadBaseControl IO m => ThreadGroup -> String -> m () -> m (ThreadId, m (Result ()))
instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.ThrownFrom
instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.StopExecution
instance GHC.Exception.Exception Database.PostgreSQL.Consumers.Utils.StopExecution
instance GHC.Exception.Exception Database.PostgreSQL.Consumers.Utils.ThrownFrom
module Database.PostgreSQL.Consumers.Config
-- | Action to take after a job was processed.
data Action
MarkProcessed :: Action
RerunAfter :: Interval -> Action
RerunAt :: UTCTime -> Action
Remove :: Action
-- | Result of processing a job.
data Result
Ok :: Action -> Result
Failed :: Action -> Result
-- | Config of a consumer.
data ConsumerConfig m idx job
ConsumerConfig :: !(RawSQL ()) -> !(RawSQL ()) -> ![SQL] -> !(row -> job) -> !(job -> idx) -> !(Maybe Channel) -> !Int -> !Int -> !(job -> m Result) -> !(SomeException -> job -> m Action) -> ConsumerConfig m idx job
-- | Name of a database table where jobs are stored. The table needs to
-- have the following columns in order to be suitable for acting as a job
-- queue:
--
--
-- - id - represents ID of the job. Needs to be a primary key of the
-- type convertible to text, not nullable.
-- - run_at - represents the time at which the job will be processed.
-- Needs to be nullable, of the type comparable with now() (TIMESTAMPTZ
-- is recommended). Note: a job with run_at set to NULL is never picked
-- for processing. Useful for storing already processed/expired jobs for
-- debugging purposes.
-- - finished_at - represents the time at which job processing was
-- finished. Needs to be nullable, of the type you can assign now() to
-- (TIMESTAMPTZ is recommended). NULL means that the job was either never
-- processed or that it was started and failed at least once.
-- - reserved_by - represents ID of the consumer that currently
-- processes the job. Needs to be nullable, of the type corresponding to
-- id in the table ccConsumersTable. It's recommended (though not
-- neccessary) to make it a foreign key referencing id in
-- ccConsumersTable with ON DELETE SET NULL.
-- - attempts - represents number of job processing attempts made so
-- far. Needs to be not nullable, of type INTEGER. Initial value of a
-- fresh job should be 0, therefore it makes sense to make the column
-- default to 0.
--
[ccJobsTable] :: ConsumerConfig m idx job -> !(RawSQL ())
-- | Name of a database table where registered consumers are stored. The
-- table itself needs to have the following columns:
--
--
-- - id - represents ID of a consumer. Needs to be a primary key of the
-- type SERIAL or BIGSERIAL (recommended).
-- - name - represents jobs table of the consumer. Needs to be not
-- nullable, of type TEXT. Allows for tracking consumers of multiple
-- queues with one table. Set to ccJobsTable.
-- - last_activity - represents the last registered activity of the
-- consumer. It's updated periodically by all currently running consumers
-- every 30 seconds to prove that they are indeed running. They also
-- check for the registered consumers that didn't update their status for
-- a minute. If any such consumers are found, they are presumed to be not
-- working and all the jobs reserved by them are released. This prevents
-- the situation where a consumer with reserved jobs silently fails (e.g.
-- because of a hard crash) and these jobs stay locked forever, yet are
-- never processed.
--
[ccConsumersTable] :: ConsumerConfig m idx job -> !(RawSQL ())
-- | Fields needed to be selected from the jobs table in order to assemble
-- a job.
[ccJobSelectors] :: ConsumerConfig m idx job -> ![SQL]
-- | Function that transforms the list of fields into a job.
[ccJobFetcher] :: ConsumerConfig m idx job -> !(row -> job)
-- | Selector for taking out job ID from the job object.
[ccJobIndex] :: ConsumerConfig m idx job -> !(job -> idx)
-- | Notification channel used for listening for incoming jobs. If set to
-- Nothing, no listening is performed and jobs are selected from
-- the database every ccNotificationTimeout microseconds.
[ccNotificationChannel] :: ConsumerConfig m idx job -> !(Maybe Channel)
-- | Timeout of listening for incoming jobs, in microseconds. The consumer
-- checks between the timeouts if there are any jobs in the database that
-- needs to be processed, so even if ccNotificationChannel is
-- Just, you need to set it to a reasonable number if the jobs you
-- process may fail and are retried later, as there is no way to signal
-- with a notification that a job will need to be performed e.g. in 5
-- minutes. However, if ccNotificationChannel is Just and
-- jobs are never retried, you can set it to -1, then listening will
-- never timeout. Otherwise it needs to be a positive number.
[ccNotificationTimeout] :: ConsumerConfig m idx job -> !Int
-- | Maximum amount of jobs that can be processed in parallel.
[ccMaxRunningJobs] :: ConsumerConfig m idx job -> !Int
-- | Function that processes a job.
[ccProcessJob] :: ConsumerConfig m idx job -> !(job -> m Result)
-- | Action taken if job processing function throws an exception. Note that
-- if this action throws an exception, the consumer goes down, so it's
-- best to ensure that it doesn't throw.
[ccOnException] :: ConsumerConfig m idx job -> !(SomeException -> job -> m Action)
instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Result
instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Result
instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Result
instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Action
instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Action
instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Action
module Database.PostgreSQL.Consumers.Consumer
-- | ID of a consumer.
data ConsumerID
-- | Register consumer in the consumers table, so that it can reserve jobs
-- using acquired ID.
registerConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
-- | Unregister consumer with a given ID.
unregisterConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> ConsumerID -> m ()
instance Database.PostgreSQL.PQTypes.Format.PQFormat Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance Database.PostgreSQL.PQTypes.FromSQL.FromSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance Database.PostgreSQL.PQTypes.ToSQL.ToSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID
instance GHC.Show.Show Database.PostgreSQL.Consumers.Consumer.ConsumerID
module Database.PostgreSQL.Consumers.Components
-- | Run the consumer. The purpose of the returned monadic action is to
-- wait for currently processed jobs and clean up. This function is best
-- used in conjunction with finalize to seamlessly handle the
-- finalization.
runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
-- | Spawn a thread that generates signals for the dispatcher to probe the
-- database for incoming jobs.
spawnListener :: (MonadBaseControl IO m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m -> MVar () -> m ThreadId
-- | Spawn a thread that monitors working consumers for activity and
-- periodically updates its own.
spawnMonitor :: (MonadBaseControl IO m, MonadLog m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID -> m ThreadId
-- | Spawn a thread that reserves and processes jobs.
spawnDispatcher :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, Show idx, ToSQL idx) => ConsumerConfig m idx job -> Bool -> ConnectionSourceM m -> ConsumerID -> MVar () -> TVar (Map ThreadId idx) -> TVar Int -> m ThreadId
module Database.PostgreSQL.Consumers
-- | Run the consumer. The purpose of the returned monadic action is to
-- wait for currently processed jobs and clean up. This function is best
-- used in conjunction with finalize to seamlessly handle the
-- finalization.
runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())