Implemented here is a thread pool library on crack.
1.0 Introduction
Typically, a thread pool is a set of execution contexts that will execute tasks from an input queue. Typically, thread pools are used to parallize the processing of incoming work across all available CPUs without going through the expense of starting a new thread for every new task.
In Control.Engine
you will find a somewhat unique implementation. The
Engine
is not only a set of threads running a common mutator on the input
queue, producing an output queue, but also include hooks, task injection, and
state management.
Queues :: (Chan a) - from Control.Concurrent.Chan.
Hooks :: (a -> IO Maybe a)
Hooks can be added and removed during execution without creating a new engine. They allow the developer to modify tasks:
- prior to parallization (for sequential preprocessing)
- in parallel, prior to main mutation funciton
- in parallel, after mutation function
- post parallization (for sequential post processing)
State Management
The stateManager waits for any updates to the mutator state or hooks. If any
modifications are made then the new set of hooks (or state) is provided
to the workers. Correctness is handled by keeping the master copies as TVars
(Control.Concurrent.STM). While the mutators and hooks read from an MVar
(Control.Concurrent.MVar) to avoid contention.
The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant and performance critical. How successful this stratagy is has yet to be shown.
Injection
One injection point allows injection of a result that had no preceding
task. The second injector allows the initial hooks (Input
hooks) to be
bypassed.
- initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result)
- initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result)
- initEngine :: Eq st => Int -> IO job -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)
- data Engine job result state = Eng {}
- data Hook st msg = Hk {
- hkFunc :: st -> msg -> IO (Maybe msg)
- hkPriority :: Int
- hkDescription :: String
- addInputHook :: Engine job result state -> Hook state job -> IO ()
- addOutputHook :: Engine job result state -> Hook state result -> IO ()
- addPreMutateHook :: Engine job result state -> Hook state job -> IO ()
- addPostMutateHook :: Engine job result state -> Hook state result -> IO ()
- delInputHook :: Engine j r s -> String -> IO ()
- delOutputHook :: Engine j r s -> String -> IO ()
- delPreMutateHook :: Engine j r s -> String -> IO ()
- delPostMutateHook :: Engine j r s -> String -> IO ()
- injectPreMutator :: Engine j r s -> j -> IO ()
- injectPostMutator :: Engine j r s -> r -> IO ()
Main functions
initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result)Source
If all you want is a basic thread pool, this will work. You should consider using Control.ThreadPool instead.
Evaluation of the result is forced using seq.
initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result)Source
Simpler than calling initEngine
, but it allows no state or interaction
with the hooks and injectors. No strictness is forced.
initEngine :: Eq st => Int -> IO job -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)Source
To initilize an engine you must provide:
- the number of threads
- an action that will get the input
- an action that will consume output
- a mutator function to perform on all inputs
- an initial state for the mutator function
No strictness is forced - be sure you force evaluation if wanted. All hooks start out empty.
Hooks
A hook is simply a mutation on the task. The priority is used to order hook execution (lower value priorites happen first). For accounting and to remove old hooks the description field is used.
Hk | |
|
addInputHook :: Engine job result state -> Hook state job -> IO ()Source
Adds a hook that will be performed in serial on all jobs added to the input queue.
addOutputHook :: Engine job result state -> Hook state result -> IO ()Source
Adds a hook that will be performed in serial on all results before they are added to the output queue.
addPreMutateHook :: Engine job result state -> Hook state job -> IO ()Source
Adds a hook that will be performed in parallel before the main mutator function.
addPostMutateHook :: Engine job result state -> Hook state result -> IO ()Source
Adds a hook that will be performed in parallel after the main mutator function.
delInputHook :: Engine j r s -> String -> IO ()Source
Deletes all input hooks matching the provided desciption
delOutputHook :: Engine j r s -> String -> IO ()Source
Deletes all output hooks matching the provided desciption
delPreMutateHook :: Engine j r s -> String -> IO ()Source
Deletes all pre-mutate hooks matching the provided desciption
delPostMutateHook :: Engine j r s -> String -> IO ()Source
Deletes all post-mutate hooks matching the provided desciption
Injectors
injectPreMutator :: Engine j r s -> j -> IO ()Source
Allows adding tasks that bypass the input hooks.
injectPostMutator :: Engine j r s -> r -> IO ()Source
Allows bypassing the mutator, meaning a result
can be produced without a task.
This still hits the output hooks.