LogicGrowsOnTrees-1.1.0.2: a parallel implementation of logic programming using distributed tree exploration

Safe HaskellNone

LogicGrowsOnTrees.Parallel.Common.RequestQueue

Contents

Description

To understand the purpose of this module, it helps to know that there are two main loops running in the supervisor. The first loop runs inside the SupervisorMonad and is usually taken over by the adapter, which handles the communication between the supervisors and the workers. The second loop (referred to as the controller) is intended for the user to be able to submit requests such as a global progress update to the supervisor, or possibly adapter-specific requests (such as changing the number of workers).

With this in mind, the purpose of this module is to create infrastructure for the second loop (the controller) to submit requests to the first loop. It provides this functionality through a class so that specific adapters can extend this to provide requests specific to that adapter (such as changing the number of workers).

Synopsis

Type-classes

class (HasExplorationMode m, Functor m, MonadCatchIO m) => RequestQueueMonad m whereSource

This class provides the set of supervisor requests common to all adapters.

Methods

abort :: m ()Source

Abort the supervisor.

addWorkerCountListenerAsync :: (Int -> IO ()) -> IO () -> m ()Source

Submits a function to be called whenever the number of workers changes; the given function will be also called immediately with the current number of workers.

fork :: m () -> m ThreadIdSource

Fork a new thread running in this monad; all controller threads are automnatically killed when the run is finished.

getCurrentProgressAsync :: (ProgressFor (ExplorationModeFor m) -> IO ()) -> m ()Source

Request the current progress, invoking the given callback with the result; see getCurrentProgress for the synchronous version.

getCurrentStatisticsAsync :: (RunStatistics -> IO ()) -> m ()Source

Get the current run statistics.

getNumberOfWorkersAsync :: (Int -> IO ()) -> m ()Source

Request the number of workers, invoking the given callback with the result; see getNumberOfWorkers for the synchronous version.

requestProgressUpdateAsync :: (ProgressFor (ExplorationModeFor m) -> IO ()) -> m ()Source

Request that a global progress update be performed, invoking the given callback with the result; see requestProgressUpdate for the synchronous version.

setWorkloadBufferSize :: Int -> m ()Source

Sets the size of the workload buffer; for more information, see setWorkloadBufferSize (which links to the LogicGrowsOnTrees.Parallel.Common.Supervisor module).

Instances

RequestQueueMonad (ThreadsControllerMonad exploration_mode) 
RequestQueueMonad (WorkgroupControllerMonad inner_state exploration_mode) 
(SupervisorFullConstraint worker_id m, MonadCatchIO m) => RequestQueueMonad (RequestQueueReader exploration_mode worker_id m) 

Types

type Request exploration_mode worker_id m = SupervisorMonad exploration_mode worker_id m ()Source

A supervisor request.

data RequestQueue exploration_mode worker_id m Source

A basic supervisor request queue.

Constructors

RequestQueue 

Fields

requests :: !(TChan (Request exploration_mode worker_id m))

the queue of requests to the supervisor

receivers :: !(IORef [ProgressFor exploration_mode -> IO ()])

a list of callbacks to invoke when a global progress update has completed

controllerThreads :: !(IORef [ThreadId])

a list of the controller threads

Instances

HasExplorationMode (RequestQueueReader exploration_mode worker_id m) 
(SupervisorFullConstraint worker_id m, MonadCatchIO m) => RequestQueueMonad (RequestQueueReader exploration_mode worker_id m) 

type RequestQueueReader exploration_mode worker_id m = ReaderT (RequestQueue exploration_mode worker_id m) IOSource

A basic supervisor request queue monad, which has an implicit RequestQueue object that it uses to communicate with the supervisor loop.

Functions

Synchronized requests

addWorkerCountListener :: RequestQueueMonad m => (Int -> IO ()) -> m ()Source

Like addWorkerCountListenerAsync, but blocks until the listener has been added.

getCurrentProgress :: RequestQueueMonad m => m (ProgressFor (ExplorationModeFor m))Source

Like getCurrentProgressAsync, but blocks until the result is ready.

getCurrentStatistics :: RequestQueueMonad m => m RunStatisticsSource

Like getCurrentStatisticsAsync, but blocks until the result is ready.

getNumberOfWorkers :: RequestQueueMonad m => m IntSource

Like getNumberOfWorkersAsync, but blocks until the result is ready.

requestProgressUpdate :: RequestQueueMonad m => m (ProgressFor (ExplorationModeFor m))Source

Like requestProgressUpdateAsync, but blocks until the progress update has completed.

syncAsync :: MonadIO m => ((α -> IO ()) -> m ()) -> m αSource

General utility function for converting an asynchronous request to a synchronous request; it uses an MVar to hold the result of the request and blocks until the MVar has been filled.

Request queue management

addProgressReceiver :: MonadIO m' => (ProgressFor exploration_mode -> IO ()) -> RequestQueue exploration_mode worker_id m -> m' ()Source

Adds a callback to the given RequestQueue that will be invoked when the current global progress update has completed.

enqueueRequest :: MonadIO m' => Request exploration_mode worker_id m -> RequestQueue exploration_mode worker_id m -> m' ()Source

Enqueues a supervisor request into the given queue.

enqueueRequestAndWait :: (MonadIO m, MonadIO m') => Request exploration_mode worker_id m -> RequestQueue exploration_mode worker_id m -> m' ()Source

Like enqueueRequest, but does not return until the request has been run

newRequestQueue :: MonadIO m' => m' (RequestQueue exploration_mode worker_id m)Source

Constructs a new RequestQueue.

tryDequeueRequest :: MonadIO m' => RequestQueue exploration_mode worker_id m -> m' (Maybe (Request exploration_mode worker_id m))Source

Attempt to pop a request from the RequestQueue.

Request processing

processAllRequests :: MonadIO m => RequestQueue exploration_mode worker_id m -> SupervisorMonad exploration_mode worker_id m ()Source

Processes all of the requests in the given RequestQueue, and returns when the queue has been emptied.

receiveProgress :: MonadIO m' => RequestQueue exploration_mode worker_id m -> ProgressFor exploration_mode -> m' ()Source

Invokes all of the callbacks with the given progress and then clears the list of callbacks.

requestQueueProgramSource

Arguments

:: MonadIO m 
=> SupervisorMonad exploration_mode worker_id m ()

initialization code to run before the loop is started

-> RequestQueue exploration_mode worker_id m

the request queue

-> SupervisorProgram exploration_mode worker_id m 

Creates a supervisor program that loops forever processing requests from the given queue.

Controller threads

forkControllerThreadSource

Arguments

:: MonadIO m' 
=> RequestQueue exploration_mode worker_id m

the request queue

-> RequestQueueReader exploration_mode worker_id m ()

the controller thread

-> m' () 

Forks a controller thread; it's ThreadId is added the list in the request queue. We deliberately do not return the ThreadId from this function because you must always call killControllerThreads to kill the controller thread as this makes sure that all child threads also get killed.

killControllerThreadsSource

Arguments

:: MonadIO m' 
=> RequestQueue exploration_mode worker_id m

the request queue

-> m' () 

Kill all the controller threads and their children.

CPU time tracking

data CPUTimeTracker Source

A data structure that tracks the amount of CPU time that has been used.

newCPUTimeTracker :: NominalDiffTime -> IO CPUTimeTrackerSource

Creates a new CPU time tracker, which should be equal to the amount of total time used so far if we are continuing a previous run and zero otherwise.

startCPUTimeTracker :: RequestQueueMonad m => CPUTimeTracker -> m ()Source

Starts the CPU time tracker; it detects when it has already been started so if you attempt to start it more than once then all subsequent attempts will be ignored.

getCurrentCPUTime :: CPUTimeTracker -> IO NominalDiffTimeSource

Gets the current CPI time.

Miscellaneous

getQuantityAsync :: (MonadIO m', SupervisorFullConstraint worker_id m) => SupervisorMonad exploration_mode worker_id m α -> (α -> IO ()) -> RequestQueue exploration_mode worker_id m -> m' ()Source

Submits a Request to the supervisor and invokes the given callback with the result when it is available. (This function is used by getCurrentProgressAsync and getNumberOfWorkersAsync.)