1.0 Introduction
Typically, a thread pool is a set of execution contexts that will execute tasks from an input queue. Thread pools are used to parallize the processing of incoming work across all available CPUs without going through the expense of starting a new thread for every new task.
In Control.Engine
you will find a somewhat unique implementation. The
Engine
is not only a set of threads running a common mutator on the input
queue, placing results on an output queue, but also include hooks, task
injection, and state management.
1.1 System Figure
One input Configurable number One output thread of worker threads thread +--------+ chan1 +------------------------+ chan2 +---------+ | In Hks + ---> | PreMH, Mutator, PostMH | -----> | Out Hks | +--------+ +------------------------+ +---------+ ^ ^ ^ | | | | | Comms via an IO Ref | +-----------------+-------------------------------+ | +------------+ | State TVar | +------------+ One thread monitoring the TVar and updating the IORef
2.0 Queues :: (BoundedChan a) - from Control.Concurrent.BoundedChan.
The system uses two primary queues. One for transporting data from Input hooks to the mutator (chan1), one for data from the mutator to the output hooks (chan2). These channels are size-bounded - which is needed mostly due to the inflexibility of the GHC scheduler.
3.0 Hooks :: (a -> IO Maybe a)
Hooks can be added and removed during execution without creating a new engine. They allow the developer to modify tasks:
- Input hooks - prior to parallization (for sequential preprocessing)
- Pre-Mutator hooks - in parallel, prior to main mutation funciton
- Post-Mutator hooks - in parallel, after mutation function
- Output hooks - post parallization (for sequential post processing)
A hook returning Nothing causes the job or result to be dropped (it does not propogate any further).
4.0 Injection
One injection point allows injection of a result that had no preceding task - thus the result is only seen by the output hooks; this uses chan2. Another injector allows the input hooks to be bypassed; this uses chan1. See the above figure for channels wrt the hooks and mutator.
5.0 State Management
Control-Engine manages state for you. Semantically, all workers and hooks will see a correct state but it won't always be the most recent or consistent between threads.
The stateManager waits for any updates to the mutator state or hooks. If any
modifications are made then the new set of hooks or state is provided
to the workers. Correctness is handled by keeping the master copies as
TVars
. While the mutators and hooks read state from an IORef
to avoid
contention.
The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant and performance critical. How successful this stratagy is has yet to be shown.
- initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result)
- initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result)
- initEngine :: Eq st => Int -> Int -> IO job -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)
- data Engine job result state = Eng {
- chan1 :: BoundedChan job
- chan2 :: BoundedChan result
- tvInHook :: TVar [Hook state job]
- tvPreMutateHook :: TVar [Hook state job]
- tvPostMutateHook :: TVar [Hook state result]
- tvOutHook :: TVar [Hook state result]
- state :: TVar state
- data Hook st msg = Hk {
- hkFunc :: st -> msg -> IO (Maybe msg)
- hkPriority :: Int
- hkDescription :: String
- addInputHook :: Engine job result state -> Hook state job -> IO ()
- addOutputHook :: Engine job result state -> Hook state result -> IO ()
- addPreMutateHook :: Engine job result state -> Hook state job -> IO ()
- addPostMutateHook :: Engine job result state -> Hook state result -> IO ()
- delInputHook :: Engine j r s -> String -> IO ()
- delOutputHook :: Engine j r s -> String -> IO ()
- delPreMutateHook :: Engine j r s -> String -> IO ()
- delPostMutateHook :: Engine j r s -> String -> IO ()
- injectPreMutator :: Engine j r s -> j -> IO ()
- injectPostMutator :: Engine j r s -> r -> IO ()
Main functions
initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result)Source
If all you want is a basic thread pool, this will work. You should consider using Control.ThreadPool instead.
Evaluation of the result is forced using seq. Input, output, and intermediate channels are length bounded to a multiple of the number of workers.
initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result)Source
Simpler than calling initEngine
, but it allows no state or interaction
with the hooks and injectors. No strictness is forced.
Input, output, and intermediate channels are length bounded to a multiple of the number of workers.
initEngine :: Eq st => Int -> Int -> IO job -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)Source
To initilize an engine you must provide:
- the number of threads
- the maxiumum channel size for intermediate channels
- an action that will get the input
- an action that will consume output
- a mutator function to perform on all inputs
- an initial state for the mutator function
No strictness is forced - be sure you force evaluation if wanted. All hooks start out empty.
data Engine job result state Source
An Engine
represents a pool of threads ready to execute tasks.
Eng | |
|
Hooks
A hook is simply a mutation on the task. The priority is used to order hook execution (lower value priorites happen first). For accounting and to remove old hooks the description field is used.
Hk | |
|
addInputHook :: Engine job result state -> Hook state job -> IO ()Source
Adds a hook that will be performed in serial on all jobs added to the input queue.
addOutputHook :: Engine job result state -> Hook state result -> IO ()Source
Adds a hook that will be performed in serial on all results before they are added to the output queue.
addPreMutateHook :: Engine job result state -> Hook state job -> IO ()Source
Adds a hook that will be performed in parallel before the main mutator function.
addPostMutateHook :: Engine job result state -> Hook state result -> IO ()Source
Adds a hook that will be performed in parallel after the main mutator function.
delInputHook :: Engine j r s -> String -> IO ()Source
Deletes all input hooks matching the provided desciption
delOutputHook :: Engine j r s -> String -> IO ()Source
Deletes all output hooks matching the provided desciption
delPreMutateHook :: Engine j r s -> String -> IO ()Source
Deletes all pre-mutate hooks matching the provided desciption
delPostMutateHook :: Engine j r s -> String -> IO ()Source
Deletes all post-mutate hooks matching the provided desciption
Injectors
injectPreMutator :: Engine j r s -> j -> IO ()Source
Allows adding tasks that bypass the input hooks.
injectPostMutator :: Engine j r s -> r -> IO ()Source
Allows bypassing the mutator, meaning a result
can be produced without a task.
This still hits the output hooks.