{-| /1.0 Introduction/ Typically, a thread pool is a set of execution contexts that will execute tasks from an input queue. Thread pools are used to parallize the processing of incoming work across all available CPUs without going through the expense of starting a new thread for every new task. In 'Control.Engine' you will find a somewhat unique implementation. The 'Engine' is not only a set of threads running a common mutator on the input queue, placing results on an output queue, but also include hooks, task injection, and state management. /1.1 System Figure/ @ One input Configurable number One output thread of worker threads thread +--------+ chan1 +------------------------+ chan2 +---------+ | In Hks + ---> | PreMH, Mutator, PostMH | -----> | Out Hks | +--------+ +------------------------+ +---------+ ^ ^ ^ | | | | | Comms via an IO Ref | +-----------------+-------------------------------+ | +------------+ | State TVar | +------------+ One thread monitoring the TVar and updating the IORef @ /2.0 Queues :: (BoundedChan a)/ - from "Control.Concurrent.BoundedChan". The system uses two primary queues. One for transporting data from Input hooks to the mutator (chan1), one for data from the mutator to the output hooks (chan2). These channels are size-bounded - which is needed mostly due to the inflexibility of the GHC scheduler. /3.0 Hooks :: (a -> IO Maybe a)/ Hooks can be added and removed during execution without creating a new engine. They allow the developer to modify tasks: * Input hooks - prior to parallization (for sequential preprocessing) * Pre-Mutator hooks - in parallel, prior to main mutation funciton * Post-Mutator hooks - in parallel, after mutation function * Output hooks - post parallization (for sequential post processing) A hook returning "Nothing" causes the job or result to be dropped (it does not propogate any further). /4.0 Injection/ One injection point allows injection of a result that had no preceding task - thus the result is only seen by the output hooks; this uses chan2. Another injector allows the input hooks to be bypassed; this uses chan1. See the above figure for channels wrt the hooks and mutator. /5.0 State Management/ Control-Engine manages state for you. Semantically, all workers and hooks will see a correct state but it won't always be the most recent or consistent between threads. The stateManager waits for any updates to the mutator state or hooks. If any modifications are made then the new set of hooks or state is provided to the workers. Correctness is handled by keeping the master copies as 'TVars'. While the mutators and hooks read state from an 'IORef' to avoid contention. The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant and performance critical. How successful this stratagy is has yet to be shown. -} module Control.Engine ( -- * Main functions initSimpleEngine , initSimpleEngineIO , initEngine , Engine(..) -- * Hooks , Hook(..) , addInputHook , addOutputHook , addPreMutateHook , addPostMutateHook , delInputHook , delOutputHook , delPreMutateHook , delPostMutateHook -- * Injectors , injectPreMutator , injectPostMutator ) where import Control.Concurrent (forkIO) import Control.Concurrent.STM import Control.Concurrent.BoundedChan import Control.Monad import Data.IORef import Data.List (insert) -- |An 'Engine' represents a pool of threads ready to execute tasks. data Engine job result state = Eng { chan1 :: BoundedChan job , chan2 :: BoundedChan result , tvInHook :: TVar [Hook state job] , tvPreMutateHook :: TVar [Hook state job] , tvPostMutateHook :: TVar [Hook state result] , tvOutHook :: TVar [Hook state result] , state :: TVar state } data RefEngine job result state = RefEng { refInHook :: IORef [Hook state job] , refPreMutateHook :: IORef [Hook state job] , refPostMutateHook :: IORef [Hook state result] , refOutHook :: IORef [Hook state result] , refState :: IORef state } -- |If all you want is a basic thread pool, this will work. -- You should consider using "Control.ThreadPool" instead. -- -- Evaluation of the result is forced using seq. -- Input, output, and intermediate channels are length bounded to a multiple -- of the number of workers. initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result) initSimpleEngine nr mutator = do input <- newBoundedChan chanBound output <- newBoundedChan chanBound let m = const (return . Just . mutator) initEngine nr chanBound (readChan input) (\o -> o `seq` writeChan output o) m () return (input, output) where chanBound = nr * 8 -- |Simpler than calling 'initEngine', but it allows no state or interaction -- with the hooks and injectors. No strictness is forced. -- -- Input, output, and intermediate channels are length bounded to a multiple -- of the number of workers. initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result) initSimpleEngineIO nr mutator = do input <- newBoundedChan chanBound output <- newBoundedChan chanBound let m = (\_ j -> mutator j >>= return . Just) initEngine nr chanBound (readChan input) (writeChan output) m () return (input, output) where chanBound = nr * 8 -- |To initilize an engine you must provide: -- -- * the number of threads -- -- * the maxiumum channel size for intermediate channels -- -- * an action that will get the input -- -- * an action that will consume output -- -- * a mutator function to perform on all inputs -- -- * an initial state for the mutator function -- -- No strictness is forced - be sure you force evaluation if wanted. -- All hooks start out empty. initEngine :: (Eq st) => Int -> Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st) initEngine nrWorkers chanBound input output mutator initialState = do c1 <- newBoundedChan chanBound c2 <- newBoundedChan chanBound inputHooks <- newIORef [] outputHooks <- newIORef [] preMutatorHooks <- newIORef [] postMutatorHooks <- newIORef [] refState <- newIORef initialState ch1tv <- newTVarIO [] ch2tv <- newTVarIO [] ch3tv <- newTVarIO [] ch4tv <- newTVarIO [] stv <- newTVarIO initialState let engine = Eng c1 c2 ch1tv ch2tv ch3tv ch4tv stv eref = RefEng inputHooks outputHooks preMutatorHooks postMutatorHooks refState forkIO $ inputManager input c1 eref forkIO $ outputManager output c2 eref forkIO $ stateManager engine eref forM_ [1..nrWorkers] $ \_ -> forkIO (worker c1 eref mutator c2) return engine worker :: BoundedChan job -> RefEngine job result st -> (st -> job -> IO (Maybe result)) -> BoundedChan result -> IO () worker c1 eref mutator c2 = forever $ readChan c1 >>= worker' where worker' msg = do preMH <- readIORef (refPreMutateHook eref) postMH <- readIORef (refPostMutateHook eref) st <- readIORef (refState eref) -- run hook1, mutator, hook2 msg' <- runHooks preMH st msg out <- runStage (mutator st) msg' out' <- runStage (runHooks postMH st) out case out' of Nothing -> return () Just o -> writeChan c2 o runStage :: (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b) runStage _ Nothing = return Nothing runStage stage (Just a) = stage a stateManager :: (Eq st) => Engine job result st -> RefEngine job result st -> IO () stateManager eng eref = atomically readState >>= updateState where readState = do s <- readTVar (state eng) ih <- readTVar (tvInHook eng) eh <- readTVar (tvPreMutateHook eng) th <- readTVar (tvPostMutateHook eng) oh <- readTVar (tvOutHook eng) return (s,ih, eh, th, oh) updateState curr@(s, ih, eh, th, oh) = do writeIORef (refState eref) s writeIORef (refInHook eref) ih writeIORef (refPreMutateHook eref) eh writeIORef (refPostMutateHook eref) th writeIORef (refOutHook eref) oh new <- atomically $ do x <- readState when (x == curr) retry return x updateState new -- Input.hs inputManager :: (IO job) -> BoundedChan job -> RefEngine job result st -> IO () inputManager input outChan eref = forever $ input >>= handleMsg where handleMsg msg = do hook <- readIORef (refInHook eref) s <- readIORef (refState eref) new <- runHooks hook s msg case new of Just m -> writeChan outChan m Nothing -> return () -- Output.hs outputManager :: (result -> IO ()) -> BoundedChan result -> RefEngine job result state -> IO () outputManager output msgChan eref = forever $ do m <- readChan msgChan hook <- readIORef (refOutHook eref) s <- readIORef (refState eref) new <- runHooks hook s m case new of Just n -> output n Nothing -> return () -- Hooks.hs -- |A hook is simply a mutation on the task. The priority is used to order -- hook execution (lower value priorites happen first). For accounting and to -- remove old hooks the description field is used. data Hook st msg = Hk { hkFunc :: st -> msg -> IO (Maybe msg) , hkPriority :: Int , hkDescription :: String } instance Eq (Hook m s) where (Hk _ p d) == (Hk _ p' d') = p == p' && d == d' instance Ord (Hook a s) where (Hk _ p d) `compare` (Hk _ p' d') = (p,d) `compare` (p', d') instance Show (Hook a s) where show (Hk _ p d) = d ++ " Priority = " ++ (show p) showsPrec _ (Hk _ p d) = (++) ("Hk { hkFunc = undefined, p = " ++ (show p) ++ " , hkDescription = " ++ d ++ " } ") runHooks :: [Hook st msg] -> st -> msg -> IO (Maybe msg) runHooks hooks st m = foldM apply (Just m) hooks where apply Nothing f = return Nothing apply (Just a) f = hkFunc f st a -- |Adds a hook that will be performed in serial on all jobs added to -- the input queue. addInputHook :: Engine job result state -> Hook state job -> IO () addInputHook e h = atomically $ do readTVar (tvInHook e) >>= writeTVar (tvInHook e) . insert h -- |Adds a hook that will be performed in serial on all results -- before they are added to the output queue. addOutputHook :: Engine job result state -> Hook state result -> IO () addOutputHook e h = atomically $ do readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . insert h -- |Adds a hook that will be performed in parallel before the main mutator -- function. addPreMutateHook :: Engine job result state -> Hook state job -> IO () addPreMutateHook e h = atomically $ do readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . insert h -- |Adds a hook that will be performed in parallel after the main mutator -- function. addPostMutateHook :: Engine job result state -> Hook state result -> IO () addPostMutateHook e h = atomically $ do readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . insert h -- |Deletes all input hooks matching the provided desciption delInputHook :: Engine j r s -> String -> IO () delInputHook e s = atomically $ do readTVar (tvInHook e) >>= writeTVar (tvInHook e) . filter ( (/= s) . hkDescription) -- |Deletes all pre-mutate hooks matching the provided desciption delPreMutateHook :: Engine j r s -> String -> IO () delPreMutateHook e s = atomically $ do readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . filter ( (/= s) . hkDescription) -- |Deletes all post-mutate hooks matching the provided desciption delPostMutateHook :: Engine j r s -> String -> IO () delPostMutateHook e s = atomically $ do readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . filter ( (/= s) . hkDescription) -- |Deletes all output hooks matching the provided desciption delOutputHook :: Engine j r s -> String -> IO () delOutputHook e s = atomically $ do readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . filter ( (/= s) . hkDescription) -- Inject.hs -- | Allows adding tasks that bypass the input hooks. injectPreMutator :: Engine j r s -> j -> IO () injectPreMutator eng i = writeChan (chan1 eng) i -- | Allows bypassing the mutator, meaning a 'result' can be produced without a task. -- This still hits the output hooks. injectPostMutator :: Engine j r s -> r -> IO () injectPostMutator eng o = writeChan (chan2 eng) o