{-| Implemented here is a thread pool library on crack. 1.0 Introduction Typically, a thread pool is a set of execution contexts that will execute tasks from an input queue. Typically, thread pools are used to parallize the processing of incoming work across all available CPUs without going through the expense of starting a new thread for every new task. In 'Control.Engine' you will find a somewhat unique implementation. The 'Engine' is not only a set of threads running a common mutator on the input queue, producing an output queue, but also include hooks, task injection, and state management. Queues :: (Chan a) From "Control.Concurrent.Chan". Hooks :: (a -> IO Maybe a) Hooks can be added and removed during execution without creating a new engine. They allow the developer to modify tasks: * prior to parallization (for sequential preprocessing) * in parallel, prior to main mutation funciton * in parallel, after mutation function * post parallization (for sequential post processing) State Management The stateManager waits for any updates to the mutator state or hooks. If any modifications are made then the new set of hooks (or state) is provided to the workers. Correctness is handled by keeping the master copies as TVars ("Control.Concurrent.STM"). While the mutators and hooks read from an 'MVar' ("Control.Concurrent.MVar") to avoid contention. The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant and performance critical. How successful this stratagy is has yet to be shown. Injection One injection point allows injection of a result that had no preceding task. The second injector allows the initial hooks ('Input' hooks) to be bypassed. -} module Control.Engine ( -- * Main functions initSimpleEngine , initSimpleEngineIO , initEngine , Engine(..) -- * Hooks , Hook(..) , HookLoc(..) , addInputHook , addOutputHook , addPreMutateHook , addPostMutateHook , delInputHook , delOutputHook , delPreMutateHook , delPostMutateHook -- * Injectors , injectPreMutator , injectPostMutator ) where import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.Chan import Control.Monad import Data.IORef import Data.List (insert) -- |An 'Engine' represents a pool of threads ready to execute tasks. data Engine job result state = Eng { chan1 :: Chan job , chan2 :: Chan result , tvInHook :: TVar [Hook state job] , tvPreMutateHook :: TVar [Hook state job] , tvPostMutateHook :: TVar [Hook state result] , tvOutHook :: TVar [Hook state result] , state :: TVar state } data RefEngine job result state = RefEng { refInHook :: IORef [Hook state job] , refPreMutateHook :: IORef [Hook state job] , refPostMutateHook :: IORef [Hook state result] , refOutHook :: IORef [Hook state result] , refState :: IORef state } -- |If all you want is a basic thread pool, this will work. -- You should consider using "Control.ThreadPool" instead. -- -- Evaluation of the result is forced using seq. initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result) initSimpleEngine nr mutator = do input <- newChan output <- newChan let m = const (return . Just . mutator) initEngine nr (readChan input) (\o -> o `seq` writeChan output o) m () return (input, output) -- |Simpler than calling 'initEngine', but it allows no state or interaction -- with the hooks and injectors. No strictness is forced. initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result) initSimpleEngineIO nr mutator = do input <- newChan output <- newChan let m = (\_ j -> mutator j >>= return . Just) initEngine nr (readChan input) (writeChan output) m () return (input, output) -- |To initilize an engine you must provide: -- -- * the number of threads -- -- * an action that will get the input -- -- * an action that will consume output -- -- * a mutator function to perform on all inputs -- -- * an initial state for the mutator function -- -- No strictness is forced - be sure you force evaluation if wanted. -- All hooks start out empty. initEngine :: (Eq st) => Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st) initEngine nrWorkers input output mutator initialState = do c1 <- newChan c2 <- newChan inputHooks <- newIORef [] outputHooks <- newIORef [] preMutatorHooks <- newIORef [] postMutatorHooks <- newIORef [] refState <- newIORef initialState ch1tv <- newTVarIO [] ch2tv <- newTVarIO [] ch3tv <- newTVarIO [] ch4tv <- newTVarIO [] stv <- newTVarIO initialState let engine = Eng c1 c2 ch1tv ch2tv ch3tv ch4tv stv eref = RefEng inputHooks outputHooks preMutatorHooks postMutatorHooks refState forkIO $ inputManager input c1 eref forkIO $ outputManager output c2 eref forkIO $ stateManager engine eref forM_ [1..nrWorkers] $ \_ -> forkIO $ worker c1 eref mutator c2 return engine worker :: Chan job -> RefEngine job result st -> (st -> job -> IO (Maybe result)) -> Chan result -> IO () worker c1 eref mutator c2 = forever $ worker' where worker' = do -- Get next message, newest hooks, and state msg <- readChan c1 preMH <- readIORef (refPreMutateHook eref) postMH <- readIORef (refPostMutateHook eref) st <- readIORef (refState eref) -- run hook1, mutator, hook2 msg' <- runHooks preMH st msg out <- runStage (mutator st) msg' out' <- runStage (runHooks postMH st) out case out' of Nothing -> return () Just o -> writeChan c2 o runStage :: (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b) runStage _ Nothing = return Nothing runStage stage (Just a) = stage a stateManager :: (Eq st) => Engine job result st -> RefEngine job result st -> IO () stateManager eng eref = do curr <- atomically $ do s <- readTVar (state eng) ih <- readTVar (tvInHook eng) eh <- readTVar (tvPreMutateHook eng) th <- readTVar (tvPostMutateHook eng) oh <- readTVar (tvOutHook eng) return (s,ih, eh, th, oh) updateState curr where updateState (s, ih, eh, th, oh) = do writeIORef (refState eref) s writeIORef (refInHook eref) ih writeIORef (refPreMutateHook eref) eh writeIORef (refPostMutateHook eref) th writeIORef (refOutHook eref) oh new <- atomically $ do s' <- readTVar (state eng) ih' <- readTVar (tvInHook eng) eh' <- readTVar (tvPreMutateHook eng) th' <- readTVar (tvPostMutateHook eng) oh' <- readTVar (tvOutHook eng) when (s' == s && ih' == ih && eh' == eh && th' == th && oh' == oh) retry return (s', ih', eh', th', oh') updateState new -- Input.hs inputManager :: (IO job) -> Chan job -> RefEngine job result st -> IO () inputManager input outChan eref = forever $ input >>= handleMsg where handleMsg msg = do hook <- readIORef (refInHook eref) s <- readIORef (refState eref) new <- runHooks hook s msg case new of Just m -> writeChan outChan m Nothing -> return () -- Output.hs outputManager :: (result -> IO ()) -> Chan result -> RefEngine job result state -> IO () outputManager output msgChan eref = forever $ do m <- readChan msgChan hook <- readIORef (refOutHook eref) s <- readIORef (refState eref) new <- runHooks hook s m case new of Just n -> output n Nothing -> return () -- Hooks.hs -- |A hook is simply a mutation on the task. The priority is used to order -- hook execution (lower value priorites happen first). For accounting and to -- remove old hooks the description field is used. data Hook st msg = Hk { hkFunc :: st -> msg -> IO (Maybe msg) , hkPriority :: Int , hkDescription :: String } instance Eq (Hook m s) where (Hk _ p d) == (Hk _ p' d') = p == p' && d == d' instance Ord (Hook a s) where (Hk _ p _) `compare` (Hk _ p' _) = p `compare` p' instance Show (Hook a s) where show (Hk _ p d) = d ++ " Priority = " ++ (show p) showsPrec _ (Hk _ p d) = (++) ("Hk { hkFunc = undefined, p = " ++ (show p) ++ " , hkDescription = " ++ d ++ " } ") data HookLoc = InputHook | PreMutateHook | PostMutateHook | OutputHook deriving (Eq, Ord, Show) runHooks :: [Hook st msg] -> st -> msg -> IO (Maybe msg) runHooks hooks st m = foldM apply (Just m) hooks where apply Nothing f = return Nothing apply (Just a) f = hkFunc f st a -- |Adds a hook that will be performed in serial on all jobs added to -- the input queue. addInputHook :: Engine job result state -> Hook state job -> IO () addInputHook e h = atomically $ do readTVar (tvInHook e) >>= writeTVar (tvInHook e) . insert h -- |Adds a hook that will be performed in serial on all results -- before they are added to the output queue. addOutputHook :: Engine job result state -> Hook state result -> IO () addOutputHook e h = atomically $ do readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . insert h -- |Adds a hook that will be performed in parallel before the main mutator -- function. addPreMutateHook :: Engine job result state -> Hook state job -> IO () addPreMutateHook e h = atomically $ do readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . insert h -- |Adds a hook that will be performed in parallel after the main mutator -- function. addPostMutateHook :: Engine job result state -> Hook state result -> IO () addPostMutateHook e h = atomically $ do readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . insert h -- |Deletes all input hooks matching the provided desciption delInputHook :: Engine j r s -> String -> IO () delInputHook e s = atomically $ do readTVar (tvInHook e) >>= writeTVar (tvInHook e) . filter ( (/= s) . hkDescription) -- |Deletes all pre-mutate hooks matching the provided desciption delPreMutateHook :: Engine j r s -> String -> IO () delPreMutateHook e s = atomically $ do readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . filter ( (/= s) . hkDescription) -- |Deletes all post-mutate hooks matching the provided desciption delPostMutateHook :: Engine j r s -> String -> IO () delPostMutateHook e s = atomically $ do readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . filter ( (/= s) . hkDescription) -- |Deletes all output hooks matching the provided desciption delOutputHook :: Engine j r s -> String -> IO () delOutputHook e s = atomically $ do readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . filter ( (/= s) . hkDescription) -- Inject.hs -- | Allows adding tasks that bypass the input hooks. injectPreMutator :: Engine j r s -> j -> IO () injectPreMutator eng i = writeChan (chan1 eng) i -- | Allows bypassing the mutator, meaning a 'result' can be produced without a task. -- This still hits the output hooks. injectPostMutator :: Engine j r s -> r -> IO () injectPostMutator eng o = writeChan (chan2 eng) o