hworker- A reliable at-least-once job queue built on top of redis.

Safe HaskellNone




This module contains an at-least-once persistent job processing queue backed by Redis. It depends upon Redis not losing data once it has acknowledged it, and guaranteeing the atomicity that is specified for commands like EVAL (ie, that if you do several things within an EVAL, they will all happen or none will happen). Nothing has been tested with Redis clusters (and it likely will not work).

An example use is the following (see the repository for a slightly expanded version; also, the test cases in the repository are also good examples):

data PrintJob = Print deriving (Generic, Show)
data State = State (MVar Int)
instance ToJSON PrintJob
instance FromJSON PrintJob

instance Job State PrintJob where
  job (State mvar) Print =
    do v <- takeMVar mvar
       putMVar mvar (v + 1)
       putStrLn $ "A(" ++ show v ++ ")"
       return Success

main = do mvar <- newMVar 0
          hworker <- create "printer" (State mvar)
          forkIO (worker hworker)
          forkIO (monitor hworker)
          forkIO (forever $ queue hworker Print >> threadDelay 1000000)
          forever (threadDelay 1000000)



data Result Source #

Jobs can return Success, Retry (with a message), or Failure (with a message). Jobs that return Failure are stored in the failed queue and are not re-run. Jobs that return Retry are re-run.


Retry Text 
Failure Text 

class (FromJSON t, ToJSON t, Show t) => Job s t | s -> t where Source #

Each Worker that you create will be responsible for one type of job, defined by a Job instance.

The job can do many different things (as the value can be a variant), but be careful not to break deserialization if you add new things it can do.

The job will take some state (passed as the s parameter), which does not vary based on the job, and the actual job data structure. The data structure (the t parameter) will be stored and copied a few times in Redis while in the lifecycle, so generally it is a good idea for it to be relatively small (and have it be able to look up data that it needs while the job in running).

Finally, while deriving FromJSON and ToJSON instances automatically might seem like a good idea, you will most likely be better off defining them manually, so you can make sure they are backwards compatible if you change them, as any jobs that can't be deserialized will not be run (and will end up in the broken queue). This will only happen if the queue is non-empty when you replce the running application version, but this is obviously possible and could be likely depending on your use.

Minimal complete definition



job :: s -> t -> IO Result Source #

data Hworker s t Source #

The worker data type - it is parametrized be the worker state (the s) and the job type (the t).

data HworkerConfig s Source #

The main configuration for workers.

Each pool of workers should have a unique hwconfigName, as the queues are set up by that name, and if you have different types of data written in, they will likely be unable to be deserialized (and thus could end up in the broken queue).

The hwconfigLogger defaults to writing to stdout, so you will likely want to replace that with something appropriate (like from a logging package).

The hwconfigTimeout is really important. It determines the length of time after a job is started before the monitor will decide that the job must have died and will restart it. If it is shorter than the length of time that a normal job takes to complete, the jobs _will_ be run multiple times. This is _semantically_ okay, as this is an at-least-once processor, but obviously won't be desirable. It defaults to 120 seconds.

The hwconfigExceptionBehavior controls what happens when an exception is thrown within a job.

hwconfigFailedQueueSize controls how many failed jobs will be kept. It defaults to 1000.

data ExceptionBehavior Source #

What should happen when an unexpected exception is thrown in a job - it can be treated as either a Failure (the default) or a Retry (if you know the only exceptions are triggered by intermittent problems).

data RedisConnection Source #

When configuring a worker, you can tell it to use an existing redis connection pool (which you may have for the rest of your application). Otherwise, you can specify connection info. By default, hworker tries to connect to localhost, which may not be true for your production application.

defaultHworkerConfig :: Text -> s -> HworkerConfig s Source #

The default worker config - it needs a name and a state (as those will always be unique).

Managing Workers

create :: Job s t => Text -> s -> IO (Hworker s t) Source #

Create a new worker with the default HworkerConfig.

Note that you must create at least one worker and monitor for the queue to actually process jobs (and for it to retry ones that time-out).

createWith :: Job s t => HworkerConfig s -> IO (Hworker s t) Source #

Create a new worker with a specified HworkerConfig.

Note that you must create at least one worker and monitor for the queue to actually process jobs (and for it to retry ones that time-out).

destroy :: Job s t => Hworker s t -> IO () Source #

Destroy a worker. This will delete all the queues, clearing out all existing jobs, the broken and failed queues. There is no need to do this in normal applications (and most likely, you won't want to).

worker :: Job s t => Hworker s t -> IO () Source #

Creates a new worker thread. This is blocking, so you will want to forkIO this into a thread. You can have any number of these (and on any number of servers); the more there are, the faster jobs will be processed.

monitor :: Job s t => Hworker s t -> IO () Source #

Start a monitor. Like worker, this is blocking, so should be started in a thread. This is responsible for retrying jobs that time out (which can happen if the processing thread is killed, for example). You need to have at least one of these running to have the retry happen, but it is safe to have any number running.

Queuing Jobs

queue :: Job s t => Hworker s t -> t -> IO Bool Source #

Adds a job to the queue. Returns whether the operation succeeded.

Inspecting Workers

jobs :: Job s t => Hworker s t -> IO [t] Source #

Returns all pending jobs.

failed :: Job s t => Hworker s t -> IO [t] Source #

Returns all failed jobs. This is capped at the most recent hworkerconfigFailedQueueSize jobs that returned Failure (or threw an exception when hworkerconfigExceptionBehavior is FailOnException).

broken :: Hworker s t -> IO [(ByteString, UTCTime)] Source #

Returns the jobs that could not be deserialized, most likely because you changed the 'ToJSON'/'FromJSON' instances for you job in a way that resulted in old jobs not being able to be converted back from json. Another reason for jobs to end up here (and much worse) is if you point two instances of Hworker, with different job types, at the same queue (ie, you re-use the name). Then anytime a worker from one queue gets a job from the other it would think it is broken.

Debugging Utilities

debugger :: Job s t => Int -> Hworker s t -> IO () Source #

Logs the contents of the jobqueue and the inprogress queue at microseconds intervals.