-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A job queue library -- -- Haskell JobQueue is a library used for building a job scheduler with -- priority queues. The state of jobs is stored in a backend database -- such as Apache Zookeeper or other highly reliable message queue -- systems. @package jobqueue @version 0.1.6 module Network.JobQueue.Backend.Class class BackendQueue q where closeQueue _ = return () readQueue :: BackendQueue q => q -> IO (Maybe (ByteString, String)) peekQueue :: BackendQueue q => q -> IO (Maybe (ByteString, String, String, Int)) updateQueue :: BackendQueue q => q -> String -> ByteString -> Int -> IO (Bool) deleteQueue :: BackendQueue q => q -> String -> IO (Bool) writeQueue :: BackendQueue q => q -> ByteString -> Int -> IO (String) listQueue :: BackendQueue q => q -> IO ([ByteString]) itemsQueue :: BackendQueue q => q -> IO ([String]) countQueue :: BackendQueue q => q -> IO (Int) closeQueue :: BackendQueue q => q -> IO () module Network.JobQueue.Backend.Types data Backend Backend :: (String -> IO q) -> IO () -> Backend [bOpenQueue] :: Backend -> String -> IO q [bClose] :: Backend -> IO () data BackendError NotFound :: String -> BackendError SessionError :: String -> BackendError instance GHC.Show.Show Network.JobQueue.Backend.Types.BackendError instance GHC.Exception.Exception Network.JobQueue.Backend.Types.BackendError module Network.JobQueue.Backend.Sqlite3 openSqlite3Backend :: String -> IO Backend newSqlite3Backend :: Connection -> IO Backend instance Network.JobQueue.Backend.Class.BackendQueue Network.JobQueue.Backend.Sqlite3.Sqlite3Queue module Network.JobQueue.Backend.Zookeeper openZookeeperBackend :: String -> IO Backend newZookeeperBackend :: Zookeeper -> Backend module Network.JobQueue.Backend -- | Open a backend database. openBackend :: String -> IO Backend -- | Class definitions module Network.JobQueue.Class -- | Environment class class Env a -- | Description class class (Show a) => Desc a where desc x = show x shortDesc x = takeWhile (/= ' ') $ show x -- | Define the description of a unit. desc :: Desc a => a -> String -- | Define the short description of a unit. shortDesc :: Desc a => a -> String -- | Unit class class (Read a, Show a, Desc a, Eq a) => Unit a where getPriority _ju = 1 getRecovery ju = ju toBeLogged _ju = False -- | Define the priority of a unit. getPriority :: Unit a => a -> Int -- | Define the recovery state of a unit. getRecovery :: Unit a => a -> a -- | Define the logging necessity of a unit. toBeLogged :: Unit a => a -> Bool module Network.JobQueue.Types data JobActionState e a JobActionState :: [ActionFn e a] -> JobActionState e a [jobActions] :: JobActionState e a -> [ActionFn e a] data (Env e, Unit a) => JobM e a b type ActionM e a b = ActionT e a IO b data ActionT e a m b type ActionFn e a = e -> a -> IO (Either Break (Maybe (RuntimeState a))) data ActionEnv e a ActionEnv :: e -> a -> ActionEnv e a [getJobEnv] :: ActionEnv e a -> e [getJobUnit] :: ActionEnv e a -> a -- | Unit class class (Read a, Show a, Desc a, Eq a) => Unit a where getPriority _ju = 1 getRecovery ju = ju toBeLogged _ju = False -- | Define the priority of a unit. getPriority :: Unit a => a -> Int -- | Define the recovery state of a unit. getRecovery :: Unit a => a -> a -- | Define the logging necessity of a unit. toBeLogged :: Unit a => a -> Bool data RuntimeState a RS :: (Maybe a) -> [(a, Maybe UTCTime)] -> Int -> RuntimeState a [rsNextJob] :: RuntimeState a -> (Maybe a) [rsNextForks] :: RuntimeState a -> [(a, Maybe UTCTime)] [rsCommits] :: RuntimeState a -> Int data Break Unhandled :: SomeException -> Break Failure :: String -> Break Retriable :: Break data LogLevel :: * LevelDebug :: LogLevel LevelInfo :: LogLevel LevelWarn :: LogLevel LevelError :: LogLevel LevelOther :: Text -> LogLevel setNextJob :: (Unit a) => a -> (RuntimeState a) -> (RuntimeState a) setNextJobIfEmpty :: (Unit a) => a -> (RuntimeState a) -> (RuntimeState a) emptyNextJob :: (Unit a) => (RuntimeState a) -> (RuntimeState a) addForkJob :: (Unit a) => (a, Maybe UTCTime) -> (RuntimeState a) -> (RuntimeState a) incrementCommits :: (Unit a) => (RuntimeState a) -> (RuntimeState a) getCommits :: (Unit a) => (RuntimeState a) -> Int runS :: JobM e a b -> StateT (JobActionState e a) IO b runAM :: ActionT e a m b -> ExceptT Break (ReaderT (ActionEnv e a) (StateT (Maybe (RuntimeState a)) (LoggingT m))) b addAction :: (Env e, Unit a) => ActionFn e a -> JobActionState e a -> JobActionState e a setResult :: (Unit a) => Maybe (RuntimeState a) -> Maybe (RuntimeState a) -> Maybe (RuntimeState a) instance Control.Monad.Base.MonadBase base m => Control.Monad.Base.MonadBase base (Network.JobQueue.Types.ActionT e a m) instance GHC.Base.Monad m => Control.Monad.State.Class.MonadState (GHC.Base.Maybe (Network.JobQueue.Types.RuntimeState a)) (Network.JobQueue.Types.ActionT e a m) instance GHC.Base.Monad m => Control.Monad.Reader.Class.MonadReader (Network.JobQueue.Types.ActionEnv e a) (Network.JobQueue.Types.ActionT e a m) instance GHC.Base.Monad m => Control.Monad.Error.Class.MonadError Network.JobQueue.Types.Break (Network.JobQueue.Types.ActionT e a m) instance Control.Monad.IO.Class.MonadIO m => Control.Monad.Logger.MonadLogger (Network.JobQueue.Types.ActionT e a m) instance Control.Monad.IO.Class.MonadIO m => Control.Monad.IO.Class.MonadIO (Network.JobQueue.Types.ActionT e a m) instance GHC.Base.Monad m => GHC.Base.Monad (Network.JobQueue.Types.ActionT e a m) instance GHC.Base.Monad m => GHC.Base.Functor (Network.JobQueue.Types.ActionT e a m) instance GHC.Base.Monad m => GHC.Base.Applicative (Network.JobQueue.Types.ActionT e a m) instance Control.Monad.State.Class.MonadState (Network.JobQueue.Types.JobActionState e a) (Network.JobQueue.Types.JobM e a) instance GHC.Base.Applicative (Network.JobQueue.Types.JobM e a) instance GHC.Base.Functor (Network.JobQueue.Types.JobM e a) instance Control.Monad.IO.Class.MonadIO (Network.JobQueue.Types.JobM e a) instance GHC.Base.Monad (Network.JobQueue.Types.JobM e a) instance GHC.Show.Show a => GHC.Show.Show (Network.JobQueue.Types.RuntimeState a) instance GHC.Show.Show Network.JobQueue.Types.Break instance Network.JobQueue.Class.Unit a => Data.Default.Class.Default (Network.JobQueue.Types.RuntimeState a) instance Data.Default.Class.Default (Network.JobQueue.Types.JobActionState e a) instance Control.Monad.Trans.Class.MonadTrans (Network.JobQueue.Types.ActionT e a) instance Control.Monad.Trans.Control.MonadTransControl (Network.JobQueue.Types.ActionT e a) instance Control.Monad.Trans.Control.MonadBaseControl base m => Control.Monad.Trans.Control.MonadBaseControl base (Network.JobQueue.Types.ActionT e a m) module Network.JobQueue.Logger logDebug :: Q Exp logInfo :: Q Exp logWarn :: Q Exp logError :: Q Exp logNotice :: Q Exp logCritical :: Q Exp -- | Use this newtype wrapper for your single parameter if you are -- formatting a string containing exactly one substitution site. data Only a :: * -> * module Network.JobQueue.Job.Internal data JobState Initialized :: JobState Runnable :: JobState Running :: JobState Aborted :: JobState Finished :: JobState -- | Job control block Job consists of State, Unit, -- CTime, OnTime, Id, Group, and -- Priority. -- -- data Job a Job :: JobState -> a -> UTCTime -> UTCTime -> Int -> Int -> Int -> Job a [jobState] :: Job a -> JobState [jobUnit] :: Job a -> a [jobCTime] :: Job a -> UTCTime [jobOnTime] :: Job a -> UTCTime [jobId] :: Job a -> Int [jobGroup] :: Job a -> Int [jobPriority] :: Job a -> Int StopTheWorld :: Job a createJob :: (Unit a) => JobState -> a -> IO (Job a) createOnTimeJob :: (Unit a) => JobState -> UTCTime -> a -> IO (Job a) printJob :: (Unit a) => Job a -> IO () defaultId :: Int defaultGroup :: Int instance GHC.Classes.Eq a => GHC.Classes.Eq (Network.JobQueue.Job.Internal.Job a) instance GHC.Read.Read a => GHC.Read.Read (Network.JobQueue.Job.Internal.Job a) instance GHC.Show.Show a => GHC.Show.Show (Network.JobQueue.Job.Internal.Job a) instance GHC.Classes.Eq Network.JobQueue.Job.Internal.JobState instance GHC.Read.Read Network.JobQueue.Job.Internal.JobState instance GHC.Show.Show Network.JobQueue.Job.Internal.JobState module Network.JobQueue.AuxClass class Aux a where auxLogger _ loc logsrc loglevel msg = do { progName <- getProgName; logFunc loglevel progName $ unpack $ fromLogStr $ defaultLogStr loc logsrc loglevel msg } where logFunc level = case level of { LevelDebug -> debugM LevelInfo -> infoM LevelWarn -> warningM LevelError -> errorM LevelOther "notice" -> noticeM LevelOther "critical" -> criticalM LevelOther _ -> warningM } auxHandleFailure _ mjob = do { case mjob of { Just job -> Just <$> createJob Runnable (getRecovery (jobUnit job)) Nothing -> return (Nothing) } } auxHandleAfterExecute _ _job = return () auxLogger :: Aux a => a -> Loc -> LogSource -> LogLevel -> LogStr -> IO () auxHandleFailure :: (Aux a, Unit b) => a -> Maybe (Job b) -> IO (Maybe (Job b)) auxHandleAfterExecute :: (Aux a, Unit b) => a -> Job b -> IO () module Network.JobQueue.Action data JobActionState e a runActionState :: (Env e, Unit a) => JobActionState e a -> ActionFn e a runAction :: (Aux e, Env e, Unit a) => e -> a -> ActionT e a IO () -> IO (Either Break (Maybe (RuntimeState a))) -- | Get environment in action. getEnv :: (Env e, Unit a) => ActionM e a e -- | Move to the next state immediately. After the execution of the action -- the job being processed will be moved to the given state. The next -- action will be invoked immediately and can continue to work without -- being interrupted by another job. NOTE: This overrides the next state -- if it is already set. next :: (Env e, Unit a) => a -> ActionM e a () -- | Move to the next state immediately. This is different from "next" -- function because this doesn't override if the next job is already set. orNext :: (Env e, Unit a) => a -> ActionM e a () -- | Finish a job. fin :: (Env e, Unit a) => ActionM e a () -- | If the unit passed by the job queue system cannot be processed by the -- action function, the function should call this. none :: (Env e, Unit a) => ActionM e a () -- | Create a job with a unit and schedule it. fork :: (Env e, Unit a) => a -> ActionM e a () -- | Create a job with a unit and schedule it after a few micro seconds. forkInTime :: (Env e, Unit a) => NominalDiffTime -> a -> ActionM e a () -- | Create a job with a unit and schedule it at a specific time. forkOnTime :: (Env e, Unit a) => UTCTime -> a -> ActionM e a () -- | Abort the execution of a state machine. If a critical problem is found -- and there is a need to switch to the failure state, call this -- function. abort :: (Env e, Unit a) => ActionM e a b -- | Do a dirty I/O action with a side effect to the external system. If it -- doesn't change the state of the external system, you should use liftIO -- instead. commitIO :: (Env e, Unit a) => IO b -> ActionM e a b -- | Lift a computation from the IO monad. liftIO :: MonadIO m => forall a. IO a -> m a -- | Yield execution yield :: (Env e, Unit a) => ActionM e a () module Network.JobQueue.Param -- | Environment with a parameter set class (Env a) => ParamEnv a where envParameters _env = [] envParameters :: ParamEnv a => a -> [(String, String)] class Param a decodeParam :: Param a => String -> Maybe a encodeParam :: Param a => a -> String -- | Get a parameter value with a key from the environment in action. This -- is a special function for ParamEnv. param :: (ParamEnv e, Unit a, Param b) => (String, String) -> ActionM e a b instance Network.JobQueue.Param.Param GHC.Base.String instance Network.JobQueue.Param.Param GHC.Types.Int instance Network.JobQueue.Param.Param GHC.Integer.Type.Integer instance Network.JobQueue.Param.Param GHC.Types.Double instance Network.JobQueue.Param.Param Data.Aeson.Types.Internal.Value module Network.JobQueue.Job -- | Job control block Job consists of State, Unit, -- CTime, OnTime, Id, Group, and -- Priority. -- -- data Job a StopTheWorld :: Job a data JobState Initialized :: JobState Runnable :: JobState Running :: JobState Aborted :: JobState Finished :: JobState buildActionState :: (Env e, Unit a) => JobM e a () -> IO (JobActionState e a) -- | Declare a function which accepts a unit and execute the action of it -- if possible. process :: (Aux e, Env e, Unit a) => (a -> ActionM e a ()) -> JobM e a () createJob :: (Unit a) => JobState -> a -> IO (Job a) createOnTimeJob :: (Unit a) => JobState -> UTCTime -> a -> IO (Job a) printJob :: (Unit a) => Job a -> IO () module Network.JobQueue.JobQueue.Internal data JobQueue e a JobQueue :: q -> JobActionState e a -> JobQueue e a [jqBackendQueue] :: JobQueue e a -> q [jqActionState] :: JobQueue e a -> JobActionState e a data ActionForJob a Execute :: (Job a) -> ActionForJob a Delete :: ActionForJob a Skip :: ActionForJob a actionForJob :: Unit a => Job a -> String -> ActionForJob a peekJob' :: (Unit a) => JobQueue e a -> IO (Maybe (Job a, String, String, Int)) executeJob' :: (Aux e, Env e, Unit a) => JobQueue e a -> e -> String -> Job a -> Int -> IO (Either Break (Maybe (RuntimeState a))) afterExecuteJob :: (Aux e, Env e, Unit a) => JobQueue e a -> e -> String -> Job a -> Int -> Either Break (Maybe (RuntimeState a)) -> IO () rescheduleJob :: (Unit a) => JobQueue e a -> Maybe UTCTime -> a -> IO () updateJob :: (Unit a) => JobQueue e a -> String -> Job a -> Int -> IO (Bool) pack :: (Unit a) => Job a -> ByteString module Network.JobQueue.JobQueue data JobQueue e a -- | A session handler -- -- A session usually represents a database session to access job queues -- stored in the backend database. data Session -- | Open a queue session with a resource locator openSession :: String -> IO (Session) -- | Create a queue session with a backend handler newSession :: String -> Backend -> Session -- | Close a queue session if needed closeSession :: Session -> IO () -- | Open a job queue with a session. openJobQueue :: (Env e, Unit a) => Session -> String -> JobM e a () -> IO (JobQueue e a) -- | Close a job queue. closeJobQueue :: (Env e, Unit a) => JobQueue e a -> IO () -- | Count the number of jobs queued in a job queue. countJobQueue :: (Env e, Unit a) => JobQueue e a -> IO (Int) -- | Resume a job queue resumeJobQueue :: (Env e, Unit a) => JobQueue e a -> IO (Bool) -- | Suspend a job queue suspendJobQueue :: (Env e, Unit a) => JobQueue e a -> IO (Bool) -- | Execute an action of the head job in a job queue. executeJob :: (Aux e, Env e, Unit a) => JobQueue e a -> e -> IO () -- | Schedule a job. scheduleJob :: (Unit a) => JobQueue e a -> a -> IO () -- | Delete a job from a job queue. deleteJob :: (Unit a) => JobQueue e a -> String -> IO Bool -- | Clear all jobs from a job queue. clearJobs :: (Unit a) => JobQueue e a -> IO [(String, Job a)] -- | Peek a job form a job queue. peekJob :: (Unit a) => JobQueue e a -> IO (Maybe (Job a)) module Network.JobQueue.Util waitForAllJobs :: (Env e, Unit a) => JobQueue e a -> Int -> ((Maybe (Job a)) -> Int -> IO ()) -> IO (Maybe (Job a)) waitUntilMatch :: (Env e, Unit a) => JobQueue e a -> String -> Int -> ((Maybe (Job a)) -> Int -> IO ()) -> IO (Maybe (Job a)) -- | Haskell JobQueue is a library used for building a job scheduler with a -- priority queue. The state of a job is stored in a backend database -- such as Apache Zookeeper or other highly reliable mesage queue -- systems. -- -- -- -- Unit represents each state in an entire state machine. Units are -- described as value constructors in Haskell code. Unit itself is not -- executable. To execute using job queue system, extra information such -- as job identifier, scheduled time is needed. An instance of a unit is -- wrapped by a job and stored into the job queue with those -- information. -- -- The code shown below describes how to define a Unit. -- --
--   data JobUnit = HelloStep | WorldStep deriving (Show, Read)
--   
--   instance Unit JobUnit where
--   
-- -- In this case, you define JobUnit type with 2 states, HelloStep and -- WorldStep. This is the entire state machine of your job queue system. -- You can define nested or child state machines by defining more complex -- data types as long as they are serializable with read and show -- functions. -- -- For more information, see Network.JobQueue.Class. -- -- -- -- Each task executed by state machines (such as checking server state or -- repairing a cluster) is called a job. -- -- A job is described as a particular state of a state machine. Each -- state only does one thing (especially for modifying operations). This -- prevents jobs ending in a failure state, which the state machine is -- unable to handle. -- -- You don't have to know the internal data structure of a job, but need -- to understand its when you write action code. -- -- For more information, see Network.JobQueue.Job. -- -- -- -- Each unit can contain information used in the action of the state. But -- in many cases, there is some information used by almost all states and -- it is convenient if there is some kind of global data set that is -- accessible from all the state's actions. -- -- For this reason, you can define global data structures called -- environment. The enviroment can be retrieved using getEnv function in -- action monad. -- --
--   env <- getEnv
--   
-- -- For more information, see Network.JobQueue.Class. -- -- -- -- An action is a function that is called with a unit. You can define -- actions with the "process" function. -- --
--   let withJobQueue = buildJobQueue loc name $ do
--         process $ \WorldStep -> commitIO (putStrLn "world") >> fin
--         process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep
--   
-- -- In general, an action does the following things: -- -- -- -- For more information, see Network.JobQueue.Action. module Network.JobQueue -- | Build a function that takes a function ((JobQueue a -> -- IO ()) -> IO ()) as its first parameter. -- -- The following code executes jobs as long as the queue is not empty. -- --
--   main' loc name = do
--     let withJobQueue = buildJobQueue loc name $ do
--           process $ \WorldStep -> commitIO (putStrLn "world") >> fin
--           process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep
--     withJobQueue $ loop (initJobEnv loc name [])
--     where
--       loop env jq = do
--         executeJob jq env
--         count <- countJobQueue jq
--         when (count > 0) $ loop env jq
--   
-- -- The following code registers a job with initial state. -- --
--   main' loc name = do
--     let withJobQueue = buildJobQueue loc name $ do
--           process $ \WorldStep -> commitIO (putStrLn "world") >> fin
--           process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep
--     withJobQueue $ \jq -> scheduleJob jq HelloStep
--   
buildJobQueue :: (Env e, Unit a) => String -> String -> JobM e a () -> ((JobQueue e a -> IO ()) -> IO ()) -- | Run a job queue while there is at least one job in the queue. runJobQueue :: (Aux e, Env e, Unit a) => e -> String -> String -> JobM e a () -> IO () -- | Job control block Job consists of State, Unit, -- CTime, OnTime, Id, Group, and -- Priority. -- -- data Job a StopTheWorld :: Job a data JobState Initialized :: JobState Runnable :: JobState Running :: JobState Aborted :: JobState Finished :: JobState -- | Unit class class (Read a, Show a, Desc a, Eq a) => Unit a where getPriority _ju = 1 getRecovery ju = ju toBeLogged _ju = False -- | Define the priority of a unit. getPriority :: Unit a => a -> Int -- | Define the recovery state of a unit. getRecovery :: Unit a => a -> a -- | Define the logging necessity of a unit. toBeLogged :: Unit a => a -> Bool type ActionM e a b = ActionT e a IO b data (Env e, Unit a) => JobM e a b data JobActionState e a -- | Declare a function which accepts a unit and execute the action of it -- if possible. process :: (Aux e, Env e, Unit a) => (a -> ActionM e a ()) -> JobM e a () createJob :: (Unit a) => JobState -> a -> IO (Job a) -- | Finish a job. fin :: (Env e, Unit a) => ActionM e a () -- | If the unit passed by the job queue system cannot be processed by the -- action function, the function should call this. none :: (Env e, Unit a) => ActionM e a () -- | Move to the next state immediately. After the execution of the action -- the job being processed will be moved to the given state. The next -- action will be invoked immediately and can continue to work without -- being interrupted by another job. NOTE: This overrides the next state -- if it is already set. next :: (Env e, Unit a) => a -> ActionM e a () -- | Move to the next state immediately. This is different from "next" -- function because this doesn't override if the next job is already set. orNext :: (Env e, Unit a) => a -> ActionM e a () -- | Yield execution yield :: (Env e, Unit a) => ActionM e a () -- | Create a job with a unit and schedule it. fork :: (Env e, Unit a) => a -> ActionM e a () -- | Create a job with a unit and schedule it after a few micro seconds. forkInTime :: (Env e, Unit a) => NominalDiffTime -> a -> ActionM e a () -- | Create a job with a unit and schedule it at a specific time. forkOnTime :: (Env e, Unit a) => UTCTime -> a -> ActionM e a () -- | Abort the execution of a state machine. If a critical problem is found -- and there is a need to switch to the failure state, call this -- function. abort :: (Env e, Unit a) => ActionM e a b -- | Get environment in action. getEnv :: (Env e, Unit a) => ActionM e a e class Param a -- | Get a parameter value with a key from the environment in action. This -- is a special function for ParamEnv. param :: (ParamEnv e, Unit a, Param b) => (String, String) -> ActionM e a b -- | Do a dirty I/O action with a side effect to the external system. If it -- doesn't change the state of the external system, you should use liftIO -- instead. commitIO :: (Env e, Unit a) => IO b -> ActionM e a b -- | Lift a computation from the IO monad. liftIO :: MonadIO m => forall a. IO a -> m a