-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A parallel producer/consumer engine (thread pool) -- -- A parallel producer/consumer engine (thread pool). There are lots of -- features in the Engine, to include dynamically adjustable hooks, -- managed state, and injection points. @package Control-Engine @version 1.0.0.0 module Control.ThreadPool -- | A trival thread pool for pure functions (mappings). Simply specify the -- number of threads desired and a mutator function. threadPool :: Int -> (a -> b) -> IO (Chan a, Chan b) -- | A trivial thread pool that allows IO mutator functions. Evaluation of -- output is not strict - force evaluation if desired! threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b) -- | 1.0 Introduction -- -- Typically, a thread pool is a set of execution contexts that will -- execute tasks from an input queue. Thread pools are used to parallize -- the processing of incoming work across all available CPUs without -- going through the expense of starting a new thread for every new task. -- -- In Control.Engine you will find a somewhat unique -- implementation. The Engine is not only a set of threads running -- a common mutator on the input queue, placing results on an output -- queue, but also include hooks, task injection, and state management. -- --
--   +--------+  chan1 +------------------------+ chan2  +--------+
--   | In Hk  +  --->  | PreMH, Mutator, PostMH | -----> | Out Hk |
--   +--------+        +------------------------+        +--------+
--        ^                 ^                             ^
--        |                 |                             |
--        |                 |    IO Ref                   |
--        +-----------------+-----------------------------+
--                          |
--                    +------------+
--                    | State TVar |
--                    +------------+
--   
-- -- Queues :: (BoundedChan a) - from -- Control.Concurrent.BoundedChan. -- -- The system uses two primary queues. One for transporting data from -- pre-mutator hooks to the mutator. One for data from the mutator to the -- post-mutator hooks. These channels are size-bounded - which is needed -- mostly due to the inflexibility of the scheduler. -- -- Hooks :: (a -> IO Maybe a) -- -- Hooks can be added and removed during execution without creating a new -- engine. They allow the developer to modify tasks: -- -- -- -- State Management -- -- Control-Engine manages state for you. Semantically, all workers and -- hooks will see a correct state but it won't always be the most recent -- or consistent between threads. -- -- The stateManager waits for any updates to the mutator state or hooks. -- If any modifications are made then the new set of hooks or state is -- provided to the workers. Correctness is handled by keeping the master -- copies as TVars (Control.Concurrent.STM). While the mutators -- and hooks read state from an IORef -- (Control.Concurrent.IORef) to avoid contention. -- -- The thinking here is that changing the hooks and state is a rare / low -- contention action while the need for this information will be constant -- and performance critical. How successful this stratagy is has yet to -- be shown. -- -- Injection -- -- One injection point allows injection of a result that had no preceding -- task - thus the result is only seen by the output hooks. Another -- injector allows the input hooks to be bypassed. module Control.Engine -- | If all you want is a basic thread pool, this will work. You should -- consider using Control.ThreadPool instead. -- -- Evaluation of the result is forced using seq. Input, output, and -- intermediate channels are length bounded to a multiple of the number -- of workers. initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result) -- | Simpler than calling initEngine, but it allows no state or -- interaction with the hooks and injectors. No strictness is forced. -- -- Input, output, and intermediate channels are length bounded to a -- multiple of the number of workers. initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result) -- | To initilize an engine you must provide: -- -- -- -- No strictness is forced - be sure you force evaluation if wanted. All -- hooks start out empty. initEngine :: (Eq st) => Int -> Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st) -- | An Engine represents a pool of threads ready to execute tasks. data Engine job result state Eng :: BoundedChan job -> BoundedChan result -> TVar [Hook state job] -> TVar [Hook state job] -> TVar [Hook state result] -> TVar [Hook state result] -> TVar state -> Engine job result state chan1 :: Engine job result state -> BoundedChan job chan2 :: Engine job result state -> BoundedChan result tvInHook :: Engine job result state -> TVar [Hook state job] tvPreMutateHook :: Engine job result state -> TVar [Hook state job] tvPostMutateHook :: Engine job result state -> TVar [Hook state result] tvOutHook :: Engine job result state -> TVar [Hook state result] state :: Engine job result state -> TVar state -- | A hook is simply a mutation on the task. The priority is used to order -- hook execution (lower value priorites happen first). For accounting -- and to remove old hooks the description field is used. data Hook st msg Hk :: (st -> msg -> IO (Maybe msg)) -> Int -> String -> Hook st msg hkFunc :: Hook st msg -> st -> msg -> IO (Maybe msg) hkPriority :: Hook st msg -> Int hkDescription :: Hook st msg -> String -- | Adds a hook that will be performed in serial on all jobs added to the -- input queue. addInputHook :: Engine job result state -> Hook state job -> IO () -- | Adds a hook that will be performed in serial on all results before -- they are added to the output queue. addOutputHook :: Engine job result state -> Hook state result -> IO () -- | Adds a hook that will be performed in parallel before the main mutator -- function. addPreMutateHook :: Engine job result state -> Hook state job -> IO () -- | Adds a hook that will be performed in parallel after the main mutator -- function. addPostMutateHook :: Engine job result state -> Hook state result -> IO () -- | Deletes all input hooks matching the provided desciption delInputHook :: Engine j r s -> String -> IO () -- | Deletes all output hooks matching the provided desciption delOutputHook :: Engine j r s -> String -> IO () -- | Deletes all pre-mutate hooks matching the provided desciption delPreMutateHook :: Engine j r s -> String -> IO () -- | Deletes all post-mutate hooks matching the provided desciption delPostMutateHook :: Engine j r s -> String -> IO () -- | Allows adding tasks that bypass the input hooks. injectPreMutator :: Engine j r s -> j -> IO () -- | Allows bypassing the mutator, meaning a result can be -- produced without a task. This still hits the output hooks. injectPostMutator :: Engine j r s -> r -> IO () instance Show (Hook a s) instance Ord (Hook a s) instance Eq (Hook m s)