Control-Engine-1.0.0.0: A parallel producer/consumer engine (thread pool)

Control.Engine

Contents

Description

1.0 Introduction

Typically, a thread pool is a set of execution contexts that will execute tasks from an input queue. Thread pools are used to parallize the processing of incoming work across all available CPUs without going through the expense of starting a new thread for every new task.

In Control.Engine you will find a somewhat unique implementation. The Engine is not only a set of threads running a common mutator on the input queue, placing results on an output queue, but also include hooks, task injection, and state management.

  +--------+  chan1 +------------------------+ chan2  +--------+
  | In Hk  +  --->  | PreMH, Mutator, PostMH | -----> | Out Hk |
  +--------+        +------------------------+        +--------+
       ^                 ^                             ^
       |                 |                             |
       |                 |    IO Ref                   |
       +-----------------+-----------------------------+
                         |
                   +------------+
                   | State TVar |
                   +------------+

Queues :: (BoundedChan a) - from Control.Concurrent.BoundedChan.

The system uses two primary queues. One for transporting data from pre-mutator hooks to the mutator. One for data from the mutator to the post-mutator hooks. These channels are size-bounded - which is needed mostly due to the inflexibility of the scheduler.

Hooks :: (a -> IO Maybe a)

Hooks can be added and removed during execution without creating a new engine. They allow the developer to modify tasks:

  • Input hooks - prior to parallization (for sequential preprocessing)
  • Pre-Mutator hooks - in parallel, prior to main mutation funciton
  • Post-Mutator hooks - in parallel, after mutation function
  • Output hooks - post parallization (for sequential post processing)

State Management

Control-Engine manages state for you. Semantically, all workers and hooks will see a correct state but it won't always be the most recent or consistent between threads.

The stateManager waits for any updates to the mutator state or hooks. If any modifications are made then the new set of hooks or state is provided to the workers. Correctness is handled by keeping the master copies as TVars (Control.Concurrent.STM). While the mutators and hooks read state from an IORef (Control.Concurrent.IORef) to avoid contention.

The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant and performance critical. How successful this stratagy is has yet to be shown.

Injection

One injection point allows injection of a result that had no preceding task - thus the result is only seen by the output hooks. Another injector allows the input hooks to be bypassed.

Synopsis

Main functions

initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result)Source

If all you want is a basic thread pool, this will work. You should consider using Control.ThreadPool instead.

Evaluation of the result is forced using seq. Input, output, and intermediate channels are length bounded to a multiple of the number of workers.

initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result)Source

Simpler than calling initEngine, but it allows no state or interaction with the hooks and injectors. No strictness is forced.

Input, output, and intermediate channels are length bounded to a multiple of the number of workers.

initEngine :: Eq st => Int -> Int -> IO job -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)Source

To initilize an engine you must provide:

  • the number of threads
  • the maxiumum channel size for intermediate channels
  • an action that will get the input
  • an action that will consume output
  • a mutator function to perform on all inputs
  • an initial state for the mutator function

No strictness is forced - be sure you force evaluation if wanted. All hooks start out empty.

data Engine job result state Source

An Engine represents a pool of threads ready to execute tasks.

Constructors

Eng 

Fields

chan1 :: BoundedChan job
 
chan2 :: BoundedChan result
 
tvInHook :: TVar [Hook state job]
 
tvPreMutateHook :: TVar [Hook state job]
 
tvPostMutateHook :: TVar [Hook state result]
 
tvOutHook :: TVar [Hook state result]
 
state :: TVar state
 

Hooks

data Hook st msg Source

A hook is simply a mutation on the task. The priority is used to order hook execution (lower value priorites happen first). For accounting and to remove old hooks the description field is used.

Constructors

Hk 

Fields

hkFunc :: st -> msg -> IO (Maybe msg)
 
hkPriority :: Int
 
hkDescription :: String
 

Instances

Eq (Hook m s) 
Ord (Hook a s) 
Show (Hook a s) 

addInputHook :: Engine job result state -> Hook state job -> IO ()Source

Adds a hook that will be performed in serial on all jobs added to the input queue.

addOutputHook :: Engine job result state -> Hook state result -> IO ()Source

Adds a hook that will be performed in serial on all results before they are added to the output queue.

addPreMutateHook :: Engine job result state -> Hook state job -> IO ()Source

Adds a hook that will be performed in parallel before the main mutator function.

addPostMutateHook :: Engine job result state -> Hook state result -> IO ()Source

Adds a hook that will be performed in parallel after the main mutator function.

delInputHook :: Engine j r s -> String -> IO ()Source

Deletes all input hooks matching the provided desciption

delOutputHook :: Engine j r s -> String -> IO ()Source

Deletes all output hooks matching the provided desciption

delPreMutateHook :: Engine j r s -> String -> IO ()Source

Deletes all pre-mutate hooks matching the provided desciption

delPostMutateHook :: Engine j r s -> String -> IO ()Source

Deletes all post-mutate hooks matching the provided desciption

Injectors

injectPreMutator :: Engine j r s -> j -> IO ()Source

Allows adding tasks that bypass the input hooks.

injectPostMutator :: Engine j r s -> r -> IO ()Source

Allows bypassing the mutator, meaning a result can be produced without a task. This still hits the output hooks.