Control-Engine-0.0.4: A parallel producer/consumer engine (thread pool)




Implemented here is a thread pool library on crack.

  1. 0 Introduction Typically, a thread pool is a set of execution contexts that will execute tasks from an input queue. Typically, thread pools are used to parallize the processing of incoming work across all available CPUs without going through the expense of starting a new thread for every new task.

In Control.Engine you will find a somewhat unique implementation. The Engine is not only a set of threads running a common mutator on the input queue, producing an output queue, but also include hooks, task injection, and state management.

Queues :: (Chan a) From Control.Concurrent.Chan.

Hooks :: (a -> IO Maybe a) Hooks can be added and removed during execution without creating a new engine. They allow the developer to modify tasks:

  • prior to parallization (for sequential preprocessing)
  • in parallel, prior to main mutation funciton
  • in parallel, after mutation function
  • post parallization (for sequential post processing)

State Management The stateManager waits for any updates to the mutator state or hooks. If any modifications are made then the new set of hooks (or state) is provided to the workers. Correctness is handled by keeping the master copies as TVars (Control.Concurrent.STM). While the mutators and hooks read from an MVar (Control.Concurrent.MVar) to avoid contention.

The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant and performance critical. How successful this stratagy is has yet to be shown.

Injection One injection point allows injection of a result that had no preceding task. The second injector allows the initial hooks (Input hooks) to be bypassed.


Main functions

initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result)Source

If all you want is a basic thread pool, this will work. You should consider using Control.ThreadPool instead.

Evaluation of the result is forced using seq.

initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result)Source

Simpler than calling initEngine, but it allows no state or interaction with the hooks and injectors. No strictness is forced.

initEngine :: Eq st => Int -> IO job -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)Source

To initilize an engine you must provide:

  • the number of threads
  • an action that will get the input
  • an action that will consume output
  • a mutator function to perform on all inputs
  • an initial state for the mutator function

No strictness is forced - be sure you force evaluation if wanted. All hooks start out empty.

data Engine job result state Source

An Engine represents a pool of threads ready to execute tasks.




chan1 :: Chan job
chan2 :: Chan result
tvInHook :: TVar [Hook state job]
tvPreMutateHook :: TVar [Hook state job]
tvPostMutateHook :: TVar [Hook state result]
tvOutHook :: TVar [Hook state result]
state :: TVar state


data Hook st msg Source

A hook is simply a mutation on the task. The priority is used to order hook execution (lower value priorites happen first). For accounting and to remove old hooks the description field is used.




hkFunc :: st -> msg -> IO (Maybe msg)
hkPriority :: Int
hkDescription :: String


Eq (Hook m s) 
Ord (Hook a s) 
Show (Hook a s) 

addInputHook :: Engine job result state -> Hook state job -> IO ()Source

Adds a hook that will be performed in serial on all jobs added to the input queue.

addOutputHook :: Engine job result state -> Hook state result -> IO ()Source

Adds a hook that will be performed in serial on all results before they are added to the output queue.

addPreMutateHook :: Engine job result state -> Hook state job -> IO ()Source

Adds a hook that will be performed in parallel before the main mutator function.

addPostMutateHook :: Engine job result state -> Hook state result -> IO ()Source

Adds a hook that will be performed in parallel after the main mutator function.

delInputHook :: Engine j r s -> String -> IO ()Source

Deletes all input hooks matching the provided desciption

delOutputHook :: Engine j r s -> String -> IO ()Source

Deletes all output hooks matching the provided desciption

delPreMutateHook :: Engine j r s -> String -> IO ()Source

Deletes all pre-mutate hooks matching the provided desciption

delPostMutateHook :: Engine j r s -> String -> IO ()Source

Deletes all post-mutate hooks matching the provided desciption


injectPreMutator :: Engine j r s -> j -> IO ()Source

Allows adding tasks that bypass the input hooks.

injectPostMutator :: Engine j r s -> r -> IO ()Source

Allows bypassing the mutator, meaning a result can be produced without a task. This still hits the output hooks.