-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | A parallel producer/consumer engine (thread pool)
--
-- A parallel producer/consumer engine (thread pool). There are lots of
-- features in the Engine, to include dynamically adjustable hooks,
-- managed state, and injection points.
@package Control-Engine
@version 1.1.0.0
module Control.ThreadPool
-- | A trival thread pool for pure functions (mappings). Simply specify the
-- number of threads desired and a mutator function.
threadPool :: Int -> (a -> b) -> IO (Chan a, Chan b)
-- | A trivial thread pool that allows IO mutator functions. Evaluation of
-- output is not strict - force evaluation if desired!
threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b)
-- | 1.0 Introduction
--
-- Typically, a thread pool is a set of execution contexts that will
-- execute tasks from an input queue. Thread pools are used to parallize
-- the processing of incoming work across all available CPUs without
-- going through the expense of starting a new thread for every new task.
--
-- In Control.Engine you will find a somewhat unique
-- implementation. The Engine is not only a set of threads running
-- a common mutator on the input queue, placing results on an output
-- queue, but also include hooks, task injection, and state management.
--
-- 1.1 System Figure
--
--
-- One input Configurable number One output
-- thread of worker threads thread
-- +--------+ chan1 +------------------------+ chan2 +---------+
-- | In Hks + ---> | PreMH, Mutator, PostMH | -----> | Out Hks |
-- +--------+ +------------------------+ +---------+
-- ^ ^ ^
-- | | |
-- | | Comms via an IO Ref |
-- +-----------------+-------------------------------+
-- |
-- +------------+
-- | State TVar |
-- +------------+
-- One thread monitoring
-- the TVar and updating
-- the IORef
--
--
-- 2.0 Queues :: (BoundedChan a) - from
-- Control.Concurrent.BoundedChan.
--
-- The system uses two primary queues. One for transporting data from
-- Input hooks to the mutator (chan1), one for data from the mutator to
-- the output hooks (chan2). These channels are size-bounded - which is
-- needed mostly due to the inflexibility of the GHC scheduler.
--
-- 3.0 Hooks :: (a -> IO Maybe a)
--
-- Hooks can be added and removed during execution without creating a new
-- engine. They allow the developer to modify tasks:
--
--
-- - Input hooks - prior to parallization (for sequential
-- preprocessing)
-- - Pre-Mutator hooks - in parallel, prior to main mutation
-- funciton
-- - Post-Mutator hooks - in parallel, after mutation function
-- - Output hooks - post parallization (for sequential post
-- processing)
--
--
-- A hook returning Nothing causes the job or result to be dropped
-- (it does not propogate any further).
--
-- 4.0 Injection
--
-- One injection point allows injection of a result that had no preceding
-- task - thus the result is only seen by the output hooks; this uses
-- chan2. Another injector allows the input hooks to be bypassed; this
-- uses chan1. See the above figure for channels wrt the hooks and
-- mutator.
--
-- 5.0 State Management
--
-- Control-Engine manages state for you. Semantically, all workers and
-- hooks will see a correct state but it won't always be the most recent
-- or consistent between threads.
--
-- The stateManager waits for any updates to the mutator state or hooks.
-- If any modifications are made then the new set of hooks or state is
-- provided to the workers. Correctness is handled by keeping the master
-- copies as TVars. While the mutators and hooks read state from
-- an IORef to avoid contention.
--
-- The thinking here is that changing the hooks and state is a rare / low
-- contention action while the need for this information will be constant
-- and performance critical. How successful this stratagy is has yet to
-- be shown.
module Control.Engine
-- | If all you want is a basic thread pool, this will work. You should
-- consider using Control.ThreadPool instead.
--
-- Evaluation of the result is forced using seq. Input, output, and
-- intermediate channels are length bounded to a multiple of the number
-- of workers.
initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result)
-- | Simpler than calling initEngine, but it allows no state or
-- interaction with the hooks and injectors. No strictness is forced.
--
-- Input, output, and intermediate channels are length bounded to a
-- multiple of the number of workers.
initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result)
-- | To initilize an engine you must provide:
--
--
-- - the number of threads
-- - the maxiumum channel size for intermediate channels
-- - an action that will get the input
-- - an action that will consume output
-- - a mutator function to perform on all inputs
-- - an initial state for the mutator function
--
--
-- No strictness is forced - be sure you force evaluation if wanted. All
-- hooks start out empty.
initEngine :: (Eq st) => Int -> Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)
-- | An Engine represents a pool of threads ready to execute tasks.
data Engine job result state
Eng :: BoundedChan job -> BoundedChan result -> TVar [Hook state job] -> TVar [Hook state job] -> TVar [Hook state result] -> TVar [Hook state result] -> TVar state -> Engine job result state
chan1 :: Engine job result state -> BoundedChan job
chan2 :: Engine job result state -> BoundedChan result
tvInHook :: Engine job result state -> TVar [Hook state job]
tvPreMutateHook :: Engine job result state -> TVar [Hook state job]
tvPostMutateHook :: Engine job result state -> TVar [Hook state result]
tvOutHook :: Engine job result state -> TVar [Hook state result]
state :: Engine job result state -> TVar state
-- | A hook is simply a mutation on the task. The priority is used to order
-- hook execution (lower value priorites happen first). For accounting
-- and to remove old hooks the description field is used.
data Hook st msg
Hk :: (st -> msg -> IO (Maybe msg)) -> Int -> String -> Hook st msg
hkFunc :: Hook st msg -> st -> msg -> IO (Maybe msg)
hkPriority :: Hook st msg -> Int
hkDescription :: Hook st msg -> String
-- | Adds a hook that will be performed in serial on all jobs added to the
-- input queue.
addInputHook :: Engine job result state -> Hook state job -> IO ()
-- | Adds a hook that will be performed in serial on all results before
-- they are added to the output queue.
addOutputHook :: Engine job result state -> Hook state result -> IO ()
-- | Adds a hook that will be performed in parallel before the main mutator
-- function.
addPreMutateHook :: Engine job result state -> Hook state job -> IO ()
-- | Adds a hook that will be performed in parallel after the main mutator
-- function.
addPostMutateHook :: Engine job result state -> Hook state result -> IO ()
-- | Deletes all input hooks matching the provided desciption
delInputHook :: Engine j r s -> String -> IO ()
-- | Deletes all output hooks matching the provided desciption
delOutputHook :: Engine j r s -> String -> IO ()
-- | Deletes all pre-mutate hooks matching the provided desciption
delPreMutateHook :: Engine j r s -> String -> IO ()
-- | Deletes all post-mutate hooks matching the provided desciption
delPostMutateHook :: Engine j r s -> String -> IO ()
-- | Allows adding tasks that bypass the input hooks.
injectPreMutator :: Engine j r s -> j -> IO ()
-- | Allows bypassing the mutator, meaning a result can be
-- produced without a task. This still hits the output hooks.
injectPostMutator :: Engine j r s -> r -> IO ()
instance Show (Hook a s)
instance Ord (Hook a s)
instance Eq (Hook m s)