Control-Engine-0.0.1: A parallel producer/consumer engine (thread pool)




Implemented here is a thread pool library on crack. - - 1.0 Introduction - Typically, a thread pool is a set of execution contexts that will execute - tasks from an input queue. Typically, thread pools are used to parallize - the processing of incoming work across all available CPUs without going - through the expense of starting a new thread for every new task. - - In Control.Engine you will find a somewhat unique implementation. The - Engine is not only a set of threads running a common mutator on the input - queue, producing an output queue, but also include hooks, task injection, and - state management. - - Hooks :: (a -> IO Maybe a) - Hooks can be added and removed during execution without creating a new - engine. They allow the developer to modify tasks: - * prior to parallization (for sequential preprocessing) - * in parallel, prior to main mutation funciton - * in parallel, after mutation function - * post parallization (for sequential post processing) - - State Management - The stateManager waits for any updates to the mutator state or hooks. If any - modifications are made then the new set of hooks (or state) is provided - to the workers. This allows the state of the entire engine to be atomically - modified (it is all STM) but allows the workers to use cheap and quick - MVars. - - The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant - and performance critical. - - Injection - One injection point allows injection of a result that had no preceding - task. With another the initial hooks (Input hooks) can be bypassed.


Main functions

initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result)Source

If all you want is a basic thread pool, this will work. You should consider using Control.ThreadPool instead.

Evaluation of the result is forced using seq.

initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result)Source

Simpler than calling initEngine, but it allows no state or interaction with the hooks and injectors. No strictness is forced.

initEngine :: Eq st => Int -> IO job -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)Source

To initilize an engine you must provide: * the number of threads * an action that will get the input * an action that will consume output * a mutator function to perform on all inputs * an initial state for the mutator function

No strictness is forced - be sure you force evaluation if wanted. All hooks start out empty.

data Engine job result state Source

An Engine represents a pool of threads ready to execute tasks.




chan1 :: Chan job
chan2 :: Chan result
tvInHook :: TVar [Hook state job]
tvPreMutateHook :: TVar [Hook state job]
tvPostMutateHook :: TVar [Hook state result]
tvOutHook :: TVar [Hook state result]
mvInHook :: MVar [Hook state job]
mvPreMutateHook :: MVar [Hook state job]
mvPostMutateHook :: MVar [Hook state result]
mvOutHook :: MVar [Hook state result]
state :: TVar state


data Hook st msg Source




hkFunc :: st -> msg -> IO (Maybe msg)
hkPriority :: Int
hkDescription :: String


Eq (Hook m s) 
Ord (Hook a s) 
Show (Hook a s) 

addInputHook :: Engine job result state -> Hook state job -> IO ()Source

addOutputHook :: Engine job result state -> Hook state result -> IO ()Source

addPreMutateHook :: Engine job result state -> Hook state job -> IO ()Source

addPostMutateHook :: Engine job result state -> Hook state result -> IO ()Source


injectPreMutator :: Engine j r s -> j -> IO ()Source

Allows adding tasks that bypass the input hooks.

injectPostMutator :: Engine j r s -> r -> IO ()Source

Allows bypassing the mutator, meaning a result can be produced without a task. This still hits the output hooks.