-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A parallel producer/consumer engine (thread pool) -- -- A parallel producer/consumer engine (thread pool). There are lots of -- features in the Engine, to include dynamically adjustable hooks, -- managed state, and injection points. @package Control-Engine @version 1.1.0.1 module Control.ThreadPool -- | A trival thread pool for pure functions (mappings). Simply specify the -- number of threads desired and a mutator function. threadPool :: Int -> (a -> b) -> IO (Chan a, Chan b) -- | A trivial thread pool that allows IO mutator functions. Evaluation of -- output is not strict - force evaluation if desired! threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b) -- | 1.0 Introduction -- -- Typically, a thread pool is a set of execution contexts that will -- execute tasks from an input queue. Thread pools are used to parallize -- the processing of incoming work across all available CPUs without -- going through the expense of starting a new thread for every new task. -- -- In Control.Engine you will find a somewhat unique -- implementation. The Engine is not only a set of threads running -- a common mutator on the input queue, placing results on an output -- queue, but also include hooks, task injection, and state management. -- -- 1.1 System Figure -- --
--    One input            Configurable number            One output
--    thread               of worker threads              thread
--   +--------+  chan1 +------------------------+ chan2  +---------+
--   | In Hks +  --->  | PreMH, Mutator, PostMH | -----> | Out Hks |
--   +--------+        +------------------------+        +---------+
--        ^                 ^                               ^
--        |                 |                               |
--        |                 |    Comms via an IO Ref        |
--        +-----------------+-------------------------------+
--                          |
--                    +------------+
--                    | State TVar |
--                    +------------+
--                    One thread monitoring
--                    the TVar and updating
--                    the IORef
--   
-- -- 2.0 Queues :: (BoundedChan a) - from -- Control.Concurrent.BoundedChan. -- -- The system uses two primary queues. One for transporting data from -- Input hooks to the mutator (chan1), one for data from the mutator to -- the output hooks (chan2). These channels are size-bounded - which is -- needed mostly due to the inflexibility of the GHC scheduler. -- -- 3.0 Hooks :: (a -> IO Maybe a) -- -- Hooks can be added and removed during execution without creating a new -- engine. They allow the developer to modify tasks: -- -- -- -- A hook returning Nothing causes the job or result to be dropped -- (it does not propogate any further). -- -- 4.0 Injection -- -- One injection point allows injection of a result that had no preceding -- task - thus the result is only seen by the output hooks; this uses -- chan2. Another injector allows the input hooks to be bypassed; this -- uses chan1. See the above figure for channels wrt the hooks and -- mutator. -- -- 5.0 State Management -- -- Control-Engine manages state for you. Semantically, all workers and -- hooks will see a correct state but it won't always be the most recent -- or consistent between threads. -- -- The stateManager waits for any updates to the mutator state or hooks. -- If any modifications are made then the new set of hooks or state is -- provided to the workers. Correctness is handled by keeping the master -- copies as TVars. While the mutators and hooks read state from -- an IORef to avoid contention. -- -- The thinking here is that changing the hooks and state is a rare / low -- contention action while the need for this information will be constant -- and performance critical. How successful this stratagy is has yet to -- be shown. module Control.Engine -- | If all you want is a basic thread pool, this will work. You should -- consider using Control.ThreadPool instead. -- -- Evaluation of the result is forced using seq. Input, output, and -- intermediate channels are length bounded to a multiple of the number -- of workers. initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result) -- | Simpler than calling initEngine, but it allows no state or -- interaction with the hooks and injectors. No strictness is forced. -- -- Input, output, and intermediate channels are length bounded to a -- multiple of the number of workers. initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result) -- | To initilize an engine you must provide: -- -- -- -- No strictness is forced - be sure you force evaluation if wanted. All -- hooks start out empty. initEngine :: Eq st => Int -> Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st) -- | An Engine represents a pool of threads ready to execute tasks. data Engine job result state Eng :: BoundedChan job -> BoundedChan result -> TVar [Hook state job] -> TVar [Hook state job] -> TVar [Hook state result] -> TVar [Hook state result] -> TVar state -> Engine job result state chan1 :: Engine job result state -> BoundedChan job chan2 :: Engine job result state -> BoundedChan result tvInHook :: Engine job result state -> TVar [Hook state job] tvPreMutateHook :: Engine job result state -> TVar [Hook state job] tvPostMutateHook :: Engine job result state -> TVar [Hook state result] tvOutHook :: Engine job result state -> TVar [Hook state result] state :: Engine job result state -> TVar state -- | A hook is simply a mutation on the task. The priority is used to order -- hook execution (lower value priorites happen first). For accounting -- and to remove old hooks the description field is used. data Hook st msg Hk :: (st -> msg -> IO (Maybe msg)) -> Int -> String -> Hook st msg hkFunc :: Hook st msg -> st -> msg -> IO (Maybe msg) hkPriority :: Hook st msg -> Int hkDescription :: Hook st msg -> String -- | Adds a hook that will be performed in serial on all jobs added to the -- input queue. addInputHook :: Engine job result state -> Hook state job -> IO () -- | Adds a hook that will be performed in serial on all results before -- they are added to the output queue. addOutputHook :: Engine job result state -> Hook state result -> IO () -- | Adds a hook that will be performed in parallel before the main mutator -- function. addPreMutateHook :: Engine job result state -> Hook state job -> IO () -- | Adds a hook that will be performed in parallel after the main mutator -- function. addPostMutateHook :: Engine job result state -> Hook state result -> IO () -- | Deletes all input hooks matching the provided desciption delInputHook :: Engine j r s -> String -> IO () -- | Deletes all output hooks matching the provided desciption delOutputHook :: Engine j r s -> String -> IO () -- | Deletes all pre-mutate hooks matching the provided desciption delPreMutateHook :: Engine j r s -> String -> IO () -- | Deletes all post-mutate hooks matching the provided desciption delPostMutateHook :: Engine j r s -> String -> IO () -- | Allows adding tasks that bypass the input hooks. injectPreMutator :: Engine j r s -> j -> IO () -- | Allows bypassing the mutator, meaning a result can be -- produced without a task. This still hits the output hooks. injectPostMutator :: Engine j r s -> r -> IO () instance Show (Hook a s) instance Ord (Hook a s) instance Eq (Hook m s)