-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Concurrent PostgreSQL data consumers -- -- Library for setting up concurrent consumers of data stored inside -- PostgreSQL database in a simple, declarative manner. @package consumers @version 2.0 module Database.PostgreSQL.Consumers.Utils -- | Runs an action that returns a finalizer and performs it at the end. finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a -- | Exception thrown from a child thread. data ThrownFrom ThrownFrom :: String -> SomeException -> ThrownFrom -- | Stop execution of a thread. stopExecution :: MonadBase IO m => ThreadId -> m () -- | Modified version of fork that propagates thrown exceptions to -- the parent thread. forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId -- | Modified version of fork that propagates thrown exceptions to -- the parent thread. gforkP :: MonadBaseControl IO m => ThreadGroup -> String -> m () -> m (ThreadId, m (Result ())) instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.ThrownFrom instance GHC.Show.Show Database.PostgreSQL.Consumers.Utils.StopExecution instance GHC.Exception.Exception Database.PostgreSQL.Consumers.Utils.StopExecution instance GHC.Exception.Exception Database.PostgreSQL.Consumers.Utils.ThrownFrom module Database.PostgreSQL.Consumers.Config -- | Action to take after a job was processed. data Action MarkProcessed :: Action RerunAfter :: Interval -> Action RerunAt :: UTCTime -> Action Remove :: Action -- | Result of processing a job. data Result Ok :: Action -> Result Failed :: Action -> Result -- | Config of a consumer. data ConsumerConfig m idx job ConsumerConfig :: !(RawSQL ()) -> !(RawSQL ()) -> ![SQL] -> !(row -> job) -> !(job -> idx) -> !(Maybe Channel) -> !Int -> !Int -> !(job -> m Result) -> !(SomeException -> job -> m Action) -> ConsumerConfig m idx job -- | Name of a database table where jobs are stored. The table needs to -- have the following columns in order to be suitable for acting as a job -- queue: -- -- [ccJobsTable] :: ConsumerConfig m idx job -> !(RawSQL ()) -- | Name of a database table where registered consumers are stored. The -- table itself needs to have the following columns: -- -- [ccConsumersTable] :: ConsumerConfig m idx job -> !(RawSQL ()) -- | Fields needed to be selected from the jobs table in order to assemble -- a job. [ccJobSelectors] :: ConsumerConfig m idx job -> ![SQL] -- | Function that transforms the list of fields into a job. [ccJobFetcher] :: ConsumerConfig m idx job -> !(row -> job) -- | Selector for taking out job ID from the job object. [ccJobIndex] :: ConsumerConfig m idx job -> !(job -> idx) -- | Notification channel used for listening for incoming jobs. If set to -- Nothing, no listening is performed and jobs are selected from -- the database every ccNotificationTimeout microseconds. [ccNotificationChannel] :: ConsumerConfig m idx job -> !(Maybe Channel) -- | Timeout of listening for incoming jobs, in microseconds. The consumer -- checks between the timeouts if there are any jobs in the database that -- needs to be processed, so even if ccNotificationChannel is -- Just, you need to set it to a reasonable number if the jobs you -- process may fail and are retried later, as there is no way to signal -- with a notification that a job will need to be performed e.g. in 5 -- minutes. However, if ccNotificationChannel is Just and -- jobs are never retried, you can set it to -1, then listening will -- never timeout. Otherwise it needs to be a positive number. [ccNotificationTimeout] :: ConsumerConfig m idx job -> !Int -- | Maximum amount of jobs that can be processed in parallel. [ccMaxRunningJobs] :: ConsumerConfig m idx job -> !Int -- | Function that processes a job. [ccProcessJob] :: ConsumerConfig m idx job -> !(job -> m Result) -- | Action taken if job processing function throws an exception. Note that -- if this action throws an exception, the consumer goes down, so it's -- best to ensure that it doesn't throw. [ccOnException] :: ConsumerConfig m idx job -> !(SomeException -> job -> m Action) instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Result instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Result instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Result instance GHC.Show.Show Database.PostgreSQL.Consumers.Config.Action instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Config.Action instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Config.Action module Database.PostgreSQL.Consumers.Consumer -- | ID of a consumer. data ConsumerID -- | Register consumer in the consumers table, so that it can reserve jobs -- using acquired ID. registerConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID -- | Unregister consumer with a given ID. unregisterConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> ConsumerID -> m () instance Database.PostgreSQL.PQTypes.Format.PQFormat Database.PostgreSQL.Consumers.Consumer.ConsumerID instance GHC.Classes.Ord Database.PostgreSQL.Consumers.Consumer.ConsumerID instance GHC.Classes.Eq Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.FromSQL.FromSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID instance Database.PostgreSQL.PQTypes.ToSQL.ToSQL Database.PostgreSQL.Consumers.Consumer.ConsumerID instance GHC.Show.Show Database.PostgreSQL.Consumers.Consumer.ConsumerID module Database.PostgreSQL.Consumers.Components -- | Run the consumer. The purpose of the returned monadic action is to -- wait for currently processed jobs and clean up. This function is best -- used in conjunction with finalize to seamlessly handle the -- finalization. runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ()) -- | Spawn a thread that generates signals for the dispatcher to probe the -- database for incoming jobs. spawnListener :: (MonadBaseControl IO m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m -> MVar () -> m ThreadId -- | Spawn a thread that monitors working consumers for activity and -- periodically updates its own. spawnMonitor :: (MonadBaseControl IO m, MonadLog m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID -> m ThreadId -- | Spawn a thread that reserves and processes jobs. spawnDispatcher :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, Show idx, ToSQL idx) => ConsumerConfig m idx job -> Bool -> ConnectionSourceM m -> ConsumerID -> MVar () -> TVar (Map ThreadId idx) -> TVar Int -> m ThreadId module Database.PostgreSQL.Consumers -- | Run the consumer. The purpose of the returned monadic action is to -- wait for currently processed jobs and clean up. This function is best -- used in conjunction with finalize to seamlessly handle the -- finalization. runConsumer :: (MonadBaseControl IO m, MonadLog m, MonadMask m, Eq idx, Show idx, ToSQL idx) => ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())