jobqueue-0.1.6: A job queue library

Copyright(c) Gree, Inc. 2013
LicenseMIT-style
MaintainerKiyoshi Ikehara <kiyoshi.ikehara@gree.net>
Stabilityexperimental
Portabilityportable
Safe HaskellNone
LanguageHaskell98

Network.JobQueue

Description

Haskell JobQueue is a library used for building a job scheduler with a priority queue. The state of a job is stored in a backend database such as Apache Zookeeper or other highly reliable mesage queue systems.

Unit

Unit represents each state in an entire state machine. Units are described as value constructors in Haskell code. Unit itself is not executable. To execute using job queue system, extra information such as job identifier, scheduled time is needed. An instance of a unit is wrapped by a job and stored into the job queue with those information.

The code shown below describes how to define a Unit.

 data JobUnit = HelloStep | WorldStep deriving (Show, Read)
 
 instance Unit JobUnit where

In this case, you define JobUnit type with 2 states, HelloStep and WorldStep. This is the entire state machine of your job queue system. You can define nested or child state machines by defining more complex data types as long as they are serializable with read and show functions.

For more information, see Network.JobQueue.Class.

Job

Each task executed by state machines (such as checking server state or repairing a cluster) is called a job.

A job is described as a particular state of a state machine. Each state only does one thing (especially for modifying operations). This prevents jobs ending in a failure state, which the state machine is unable to handle.

You don't have to know the internal data structure of a job, but need to understand its when you write action code.

For more information, see Network.JobQueue.Job.

Environment

Each unit can contain information used in the action of the state. But in many cases, there is some information used by almost all states and it is convenient if there is some kind of global data set that is accessible from all the state's actions.

For this reason, you can define global data structures called environment. The enviroment can be retrieved using getEnv function in action monad.

 env <- getEnv

For more information, see Network.JobQueue.Class.

Action

An action is a function that is called with a unit. You can define actions with the "process" function.

   let withJobQueue = buildJobQueue loc name $ do
         process $ \WorldStep -> commitIO (putStrLn "world") >> fin
         process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep

In general, an action does the following things:

  • check if the precondition of the state is satisfied or not
  • do the action associated with the state
  • check the postcondition and return the next state.

For more information, see Network.JobQueue.Action.

Synopsis

Documentation

buildJobQueue Source

Arguments

:: (Env e, Unit a) 
=> String

locator (ex."zookeeper://192.168.0.1/myapp")

-> String

queue name (ex. "/jobqueue")

-> JobM e a ()

job construction function

-> (JobQueue e a -> IO ()) -> IO ()

job queue executor

Build a function that takes a function ((JobQueue a -> IO ()) -> IO ()) as its first parameter.

The following code executes jobs as long as the queue is not empty.

 main' loc name = do
   let withJobQueue = buildJobQueue loc name $ do
         process $ \WorldStep -> commitIO (putStrLn "world") >> fin
         process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep
   withJobQueue $ loop (initJobEnv loc name [])
   where
     loop env jq = do
       executeJob jq env
       count <- countJobQueue jq
       when (count > 0) $ loop env jq

The following code registers a job with initial state.

 main' loc name = do
   let withJobQueue = buildJobQueue loc name $ do
         process $ \WorldStep -> commitIO (putStrLn "world") >> fin
         process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep
   withJobQueue $ \jq -> scheduleJob jq HelloStep

runJobQueue Source

Arguments

:: (Aux e, Env e, Unit a) 
=> e 
-> String

locator (ex."zookeeper://192.168.0.1/myapp")

-> String

queue name (ex. "/jobqueue")

-> JobM e a ()

job construction function

-> IO () 

Run a job queue while there is at least one job in the queue.

data Job a Source

Job control block Job consists of State, Unit, CTime, OnTime, Id, Group, and Priority.

  • State - takes one of 5 states (initialized, runnable, running, aborted and finished)
  • Unit - an instance of Unit class, which is specified by type parameter of Job data type
  • CTime - creation time
  • OnTime - the time at which this job starts
  • Id - Identifier of this job
  • Group - Group ID of this job
  • Priority - the priority of this job

Constructors

StopTheWorld 

Instances

Eq a => Eq (Job a) Source 
Read a => Read (Job a) Source 
Show a => Show (Job a) Source 

class (Read a, Show a, Desc a, Eq a) => Unit a where Source

Unit class

Minimal complete definition

Nothing

Methods

getPriority :: a -> Int Source

Define the priority of a unit.

getRecovery :: a -> a Source

Define the recovery state of a unit.

toBeLogged :: a -> Bool Source

Define the logging necessity of a unit.

type ActionM e a b = ActionT e a IO b Source

data (Env e, Unit a) => JobM e a b Source

process :: (Aux e, Env e, Unit a) => (a -> ActionM e a ()) -> JobM e a () Source

Declare a function which accepts a unit and execute the action of it if possible.

createJob :: Unit a => JobState -> a -> IO (Job a) Source

fin :: (Env e, Unit a) => ActionM e a () Source

Finish a job.

none :: (Env e, Unit a) => ActionM e a () Source

If the unit passed by the job queue system cannot be processed by the action function, the function should call this.

next Source

Arguments

:: (Env e, Unit a) 
=> a

the next state

-> ActionM e a () 

Move to the next state immediately. After the execution of the action the job being processed will be moved to the given state. The next action will be invoked immediately and can continue to work without being interrupted by another job. NOTE: This overrides the next state if it is already set.

orNext Source

Arguments

:: (Env e, Unit a) 
=> a

the next state

-> ActionM e a () 

Move to the next state immediately. This is different from "next" function because this doesn't override if the next job is already set.

yield :: (Env e, Unit a) => ActionM e a () Source

Yield execution

fork Source

Arguments

:: (Env e, Unit a) 
=> a

a unit

-> ActionM e a () 

Create a job with a unit and schedule it.

forkInTime :: (Env e, Unit a) => NominalDiffTime -> a -> ActionM e a () Source

Create a job with a unit and schedule it after a few micro seconds.

forkOnTime Source

Arguments

:: (Env e, Unit a) 
=> UTCTime

absolute time in UTC

-> a

a unit

-> ActionM e a () 

Create a job with a unit and schedule it at a specific time.

abort :: (Env e, Unit a) => ActionM e a b Source

Abort the execution of a state machine. If a critical problem is found and there is a need to switch to the failure state, call this function.

getEnv :: (Env e, Unit a) => ActionM e a e Source

Get environment in action.

param :: (ParamEnv e, Unit a, Param b) => (String, String) -> ActionM e a b Source

Get a parameter value with a key from the environment in action. This is a special function for ParamEnv.

commitIO :: (Env e, Unit a) => IO b -> ActionM e a b Source

Do a dirty I/O action with a side effect to the external system. If it doesn't change the state of the external system, you should use liftIO instead.

liftIO :: MonadIO m => forall a. IO a -> m a

Lift a computation from the IO monad.