-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Concurrent PostgreSQL data consumers -- -- Library for setting up concurrent consumers of data stored inside -- PostgreSQL database in a simple, declarative manner. @package consumers @version 2.2.0.2 module Database.PostgreSQL.Consumers.Config -- | Action to take after a job was processed. data Action MarkProcessed :: Action RerunAfter :: Interval -> Action RerunAt :: UTCTime -> Action Remove :: Action -- | Result of processing a job. data Result Ok :: Action -> Result Failed :: Action -> Result -- | Config of a consumer. data ConsumerConfig m idx job ConsumerConfig :: !RawSQL () -> !RawSQL () -> ![SQL] -> !row -> job -> !job -> idx -> !Maybe Channel -> !Int -> !Int -> !job -> m Result -> !SomeException -> job -> m Action -> ConsumerConfig m idx job -- | Name of the database table where jobs are stored. The table needs to -- have the following columns in order to be suitable for acting as a job -- queue: -- -- [ccJobsTable] :: ConsumerConfig m idx job -> !RawSQL () -- | Name of a database table where registered consumers are stored. The -- table itself needs to have the following columns: -- -- [ccConsumersTable] :: ConsumerConfig m idx job -> !RawSQL () -- | Fields needed to be selected from the jobs table in order to assemble -- a job. [ccJobSelectors] :: ConsumerConfig m idx job -> ![SQL] -- | Function that transforms the list of fields into a job. [ccJobFetcher] :: ConsumerConfig m idx job -> !row -> job -- | Selector for taking out job ID from the job object. [ccJobIndex] :: ConsumerConfig m idx job -> !job -> idx -- | Notification channel used for listening for incoming jobs. If set to -- Nothing, no listening is performed and jobs are selected from -- the database every ccNotificationTimeout microseconds. [ccNotificationChannel] :: ConsumerConfig m idx job -> !Maybe Channel -- | Timeout of listening for incoming jobs, in microseconds. The consumer -- checks between the timeouts if there are any jobs in the database that -- needs to be processed, so even if ccNotificationChannel is -- Just, you need to set it to a reasonable number if the jobs you -- process may fail and are retried later, as there is no way to signal -- with a notification that a job will need to be performed e.g. in 5 -- minutes. However, if ccNotificationChannel is Just and -- jobs are never retried, you can set it to -1, then listening will -- never timeout. Otherwise it needs to be a positive number. [ccNotificationTimeout] :: ConsumerConfig m idx job -> !Int -- | Maximum amount of jobs that can be processed in parallel. [ccMaxRunningJobs] :: ConsumerConfig m idx job -> !Int -- | Function that processes a job. It's recommended to process each job in -- a separate DB transaction, otherwise you'll have to remember to commit -- your changes to the database manually. [ccProcessJob] :: ConsumerConfig m idx job -> !job -> m Result -- | Action taken if a job processing function throws an exception. Note -- that if this action throws an exception, the consumer goes down, so -- it's best to ensure that it doesn't throw. [ccOnException] :: ConsumerConfig m idx job -> !SomeException -> job -> m Action instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Result instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Result instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Result instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Action instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Action instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Action module Database.PostgreSQL.Consumers.Consumer -- | ID of a consumer. data ConsumerID -- | Register consumer in the consumers table, so that it can reserve jobs -- using acquired ID. registerConsumer :: (MonadBase IO m, MonadMask m, MonadTime m) => ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID -- | Unregister consumer with a given ID. unregisterConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> ConsumerID -> m () instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Consumer.ConsumerID instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.Format.PQFormat Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.FromSQL.FromSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.ToSQL.ToSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID instance GHC.Show.Show Database.PostgreSQL.Consumers.Consumer.ConsumerID module Database.PostgreSQL.Consumers.Utils -- | Run an action m that returns a finalizer and perform the -- returned finalizer after the action action completes. finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a -- | Exception thrown from a child thread. data ThrownFrom ThrownFrom :: String -> SomeException -> ThrownFrom -- | Stop execution of a thread. stopExecution :: MonadBase IO m => ThreadId -> m () -- | Modified version of fork that propagates thrown exceptions to -- the parent thread. forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId -- | Modified version of fork that propagates thrown exceptions to -- the parent thread. gforkP :: MonadBaseControl IO m => ThreadGroup -> String -> m () -> m (ThreadId, m (Result ())) instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.ThrownFrom instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.StopExecution instance GHC.Exception.Type.Exception Database.PostgreSQL.Consumers.Utils.ThrownFrom instance GHC.Exception.Type.Exception Database.PostgreSQL.Consumers.Utils.StopExecution module Database.PostgreSQL.Consumers.Components -- | Run the consumer. The purpose of the returned monadic action is to -- wait for currently processed jobs and clean up. This function is best -- used in conjunction with finalize to seamlessly handle the -- finalization. runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ()) runConsumerWithIdleSignal :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> TMVar Bool -> m (m ()) -- | Spawn a thread that generates signals for the dispatcher to probe the -- database for incoming jobs. spawnListener :: (MonadBaseControl IO m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m -> MVar () -> m ThreadId -- | Spawn a thread that monitors working consumers for activity and -- periodically updates its own. spawnMonitor :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, Show idx, FromSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID -> m ThreadId -- | Spawn a thread that reserves and processes jobs. spawnDispatcher :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Show idx, ToSQL idx) => ConsumerConfig m idx job -> Bool -> ConnectionSourceM m -> ConsumerID -> MVar () -> TVar (Map ThreadId idx) -> TVar Int -> Maybe (TMVar Bool) -> m ThreadId module Database.PostgreSQL.Consumers -- | Run the consumer. The purpose of the returned monadic action is to -- wait for currently processed jobs and clean up. This function is best -- used in conjunction with finalize to seamlessly handle the -- finalization. runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ()) runConsumerWithIdleSignal :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, FromSQL idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> TMVar Bool -> m (m ())