-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Concurrent PostgreSQL data consumers -- -- Library for setting up concurrent consumers of data stored inside -- PostgreSQL database in a simple, declarative manner. @package consumers @version 2.3.2.0 module Database.PostgreSQL.Consumers.Config -- | Action to take after a job was processed. data Action MarkProcessed :: Action RerunAfter :: Interval -> Action RerunAt :: UTCTime -> Action Remove :: Action -- | Result of processing a job. data Result Ok :: Action -> Result Failed :: Action -> Result -- | Config of a consumer. data ConsumerConfig m idx job ConsumerConfig :: !RawSQL () -> !RawSQL () -> ![SQL] -> !row -> job -> !job -> idx -> !Maybe Channel -> !Int -> !Int -> !job -> m Result -> !SomeException -> job -> m Action -> ConsumerConfig m idx job -- | Name of the database table where jobs are stored. The table needs to -- have the following columns in order to be suitable for acting as a job -- queue: -- -- -- -- Note: a job with run_at set to NULL is never picked for processing. -- Useful for storing already processed/expired jobs for debugging -- purposes. -- -- It's highly recommended to have an index on this column. -- -- [ccJobsTable] :: ConsumerConfig m idx job -> !RawSQL () -- | Name of a database table where registered consumers are stored. The -- table itself needs to have the following columns: -- -- [ccConsumersTable] :: ConsumerConfig m idx job -> !RawSQL () -- | Fields needed to be selected from the jobs table in order to assemble -- a job. [ccJobSelectors] :: ConsumerConfig m idx job -> ![SQL] -- | Function that transforms the list of fields into a job. [ccJobFetcher] :: ConsumerConfig m idx job -> !row -> job -- | Selector for taking out job ID from the job object. [ccJobIndex] :: ConsumerConfig m idx job -> !job -> idx -- | Notification channel used for listening for incoming jobs. Whenever -- the consumer receives a notification, it checks the database for any -- pending jobs ('run_at <= NOW()') and runs them all. If set -- to Nothing, no listening is performed and -- ccNotificationTimeout should be set to a positive number, -- otherwise no jobs would be ever run. ccNotificationChannel and -- ccNotificationTimeout can be combined. The consumer will check -- for pending jobs either when notification is received or no -- notification is received for ccNotificationTimeout microseconds -- since the last check. [ccNotificationChannel] :: ConsumerConfig m idx job -> !Maybe Channel -- | Timeout of checking for any pending jobs ('run_at <= -- NOW()'), in microseconds. The consumer checks the database for -- any pending jobs after ccNotificationTimeout microseconds since -- the last check was performed, runs them until all pending jobs are -- processed and after that, the cycle repeats. Note that even if -- ccNotificationChannel is Just, you need to set -- ccNotificationTimeout to a reasonable number if the jobs you -- process may fail and are retried later, as there is no way to signal -- with a notification that a job will need to be performed e.g. in 5 -- minutes. However, if ccNotificationChannel is Just and -- jobs are never retried, you can set it to -1, then listening will -- never timeout. Otherwise it needs to be a positive number. [ccNotificationTimeout] :: ConsumerConfig m idx job -> !Int -- | Maximum amount of jobs that can be processed in parallel. [ccMaxRunningJobs] :: ConsumerConfig m idx job -> !Int -- | Function that processes a job. It's recommended to process each job in -- a separate DB transaction, otherwise you'll have to remember to commit -- your changes to the database manually. [ccProcessJob] :: ConsumerConfig m idx job -> !job -> m Result -- | Action taken if a job processing function throws an exception. For -- robustness it's best to ensure that it doesn't throw. If it does, the -- exception will be logged and the job in question postponed by a day. [ccOnException] :: ConsumerConfig m idx job -> !SomeException -> job -> m Action instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Action instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Action instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Action instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Result instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Result instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Result module Database.PostgreSQL.Consumers.Utils -- | Run an action m that returns a finalizer and perform the -- returned finalizer after the action action completes. finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a -- | Exception thrown from a child thread. data ThrownFrom ThrownFrom :: String -> SomeException -> ThrownFrom -- | Stop execution of a thread. stopExecution :: MonadBase IO m => ThreadId -> m () -- | Modified version of fork that propagates thrown exceptions to -- the parent thread. forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId -- | Modified version of fork that propagates thrown exceptions to -- the parent thread. gforkP :: MonadBaseControl IO m => ThreadGroup -> String -> m () -> m (ThreadId, m (Result ())) preparedSqlName :: Text -> RawSQL () -> QueryName instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.StopExecution instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.ThrownFrom instance GHC.Exception.Type.Exception Database.PostgreSQL.Consumers.Utils.ThrownFrom instance GHC.Exception.Type.Exception Database.PostgreSQL.Consumers.Utils.StopExecution module Database.PostgreSQL.Consumers.Consumer -- | ID of a consumer. data ConsumerID -- | Register consumer in the consumers table, so that it can reserve jobs -- using acquired ID. registerConsumer :: (MonadBase IO m, MonadMask m, MonadTime m) => ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID -- | Unregister consumer with a given ID. unregisterConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> ConsumerID -> m () instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Consumer.ConsumerID instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.Format.PQFormat Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.FromSQL.FromSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.ToSQL.ToSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID instance GHC.Show.Show Database.PostgreSQL.Consumers.Consumer.ConsumerID module Database.PostgreSQL.Consumers.Components -- | Run the consumer. The purpose of the returned monadic action is to -- wait for currently processed jobs and clean up. This function is best -- used in conjunction with finalize to seamlessly handle the -- finalization. runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ()) runConsumerWithIdleSignal :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> TMVar Bool -> m (m ()) -- | Spawn a thread that generates signals for the dispatcher to probe the -- database for incoming jobs. spawnListener :: (MonadBaseControl IO m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m -> MVar () -> m ThreadId -- | Spawn a thread that monitors working consumers for activity and -- periodically updates its own. spawnMonitor :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID -> m ThreadId -- | Spawn a thread that reserves and processes jobs. spawnDispatcher :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Show idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID -> MVar () -> TVar (Map ThreadId idx) -> TVar Int -> Maybe (TMVar Bool) -> m ThreadId module Database.PostgreSQL.Consumers -- | Run the consumer. The purpose of the returned monadic action is to -- wait for currently processed jobs and clean up. This function is best -- used in conjunction with finalize to seamlessly handle the -- finalization. runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ()) runConsumerWithIdleSignal :: (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> TMVar Bool -> m (m ())