{- | Implemented here is a thread pool library on crack. - - 1.0 Introduction - Typically, a thread pool is a set of execution contexts that will execute - tasks from an input queue. Typically, thread pools are used to parallize - the processing of incoming work across all available CPUs without going - through the expense of starting a new thread for every new task. - - In 'Control.Engine' you will find a somewhat unique implementation. The - 'Engine' is not only a set of threads running a common mutator on the input - queue, producing an output queue, but also include hooks, task injection, and - state management. - - Hooks :: (a -> IO Maybe a) - Hooks can be added and removed during execution without creating a new - engine. They allow the developer to modify tasks: - * prior to parallization (for sequential preprocessing) - * in parallel, prior to main mutation funciton - * in parallel, after mutation function - * post parallization (for sequential post processing) - - State Management - The stateManager waits for any updates to the mutator state or hooks. If any - modifications are made then the new set of hooks (or state) is provided - to the workers. This allows the state of the entire engine to be atomically - modified (it is all STM) but allows the workers to use cheap and quick - MVars. - - The thinking here is that changing the hooks and state is a rare / low contention action while the need for this information will be constant - and performance critical. - - Injection - One injection point allows injection of a 'result' that had no preceding - 'task'. With another the initial hooks ('Input' hooks) can be bypassed. -} module Control.Engine ( -- * Main functions initSimpleEngine , initSimpleEngineIO , initEngine , Engine(..) -- * Hooks , Hook(..) , HookLoc(..) , addInputHook , addOutputHook , addPreMutateHook , addPostMutateHook , delInputHook , delOutputHook , delPreMutateHook , delPostMutateHook -- * Injectors , injectPreMutator , injectPostMutator ) where import Control.Concurrent import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Concurrent.Chan import Control.Monad import Data.List (insert) -- |An 'Engine' represents a pool of threads ready to execute tasks. data Engine job result state = Eng { chan1 :: Chan job , chan2 :: Chan result , tvInHook :: TVar [Hook state job] , tvPreMutateHook :: TVar [Hook state job] , tvPostMutateHook :: TVar [Hook state result] , tvOutHook :: TVar [Hook state result] , mvInHook :: MVar [Hook state job] , mvPreMutateHook :: MVar [Hook state job] , mvPostMutateHook :: MVar [Hook state result] , mvOutHook :: MVar [Hook state result] , state :: TVar state } -- |If all you want is a basic thread pool, this will work. -- You should consider using Control.ThreadPool instead. -- -- Evaluation of the result is forced using seq. initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result) initSimpleEngine nr mutator = do input <- newChan output <- newChan let m = const (return . Just . mutator) initEngine nr (readChan input) (\o -> o `seq` writeChan output o) m () return (input, output) -- |Simpler than calling 'initEngine', but it allows no state or interaction -- with the hooks and injectors. No strictness is forced. initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result) initSimpleEngineIO nr mutator = do input <- newChan output <- newChan let m = (\_ j -> mutator j >>= return . Just) initEngine nr (readChan input) (writeChan output) m () return (input, output) -- |To initilize an engine you must provide: -- * the number of threads -- * an action that will get the input -- * an action that will consume output -- * a mutator function to perform on all inputs -- * an initial state for the mutator function -- -- No strictness is forced - be sure you force evaluation if wanted. -- All hooks start out empty. initEngine :: (Eq st) => Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st) initEngine nrWorkers input output mutator initialState = do c1 <- newChan c2 <- newChan inputHooks <- newMVar [] outputHooks <- newMVar [] preMutatorHooks <- newMVar [] postMutatorHooks <- newMVar [] ms <- newMVar initialState tv <- newTVarIO initialState ch1tv <- newTVarIO [] ch2tv <- newTVarIO [] ch3tv <- newTVarIO [] ch4tv <- newTVarIO [] let engine = Eng c1 c2 ch1tv ch2tv ch3tv ch4tv inputHooks preMutatorHooks postMutatorHooks outputHooks tv forkIO $ inputManager input c1 inputHooks ms forkIO $ outputManager output c2 outputHooks ms forkIO $ stateManager engine ms forM_ [1..nrWorkers] $ \_ -> forkIO $ worker c1 preMutatorHooks ms mutator postMutatorHooks c2 return engine worker :: Chan job -> MVar [Hook st job] -> MVar st -> (st -> job -> IO (Maybe result)) -> MVar [Hook st result] -> Chan result -> IO () worker c1 preMutatorHooks ms mutator postMutatorHooks c2 = forever $ worker' where worker' = do -- Get next message, newest hooks, and state msg <- readChan c1 preMH <- readMVar preMutatorHooks postMH <- readMVar postMutatorHooks st <- readMVar ms -- run hook1, mutator, hook2 msg' <- runHooks preMH st msg out <- runStage (mutator st) msg' out' <- runStage (runHooks postMH st) out case out' of Nothing -> return () Just o -> writeChan c2 o runStage :: (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b) runStage _ Nothing = return Nothing runStage stage (Just a) = stage a stateManager :: (Eq st) => Engine job result st -> MVar st -> IO () stateManager eng ms = do curr <- atomically $ do -- FIXME, clean up this redundant code - its ugly! s <- readTVar (state eng) ih <- readTVar (tvInHook eng) eh <- readTVar (tvPreMutateHook eng) th <- readTVar (tvPostMutateHook eng) oh <- readTVar (tvOutHook eng) return (s,ih, eh, th, oh) updateState curr where updateState (s, ih, eh, th, oh) = do new@(s', ih', eh', th', oh') <- atomically $ do s' <- readTVar (state eng) ih' <- readTVar (tvInHook eng) eh' <- readTVar (tvPreMutateHook eng) th' <- readTVar (tvPostMutateHook eng) oh' <- readTVar (tvOutHook eng) when (not $ s' == s && ih' == ih && eh' == eh && th' == th && oh' == oh) retry return (s', ih', eh', th', oh') when (s' /= s) (swapMVar ms s' >> return ()) when (ih' /= ih) (swapMVar (mvInHook eng) ih' >> return ()) when (eh' /= eh) (swapMVar (mvPreMutateHook eng) eh' >> return ()) when (th' /= th) (swapMVar (mvPostMutateHook eng) th' >> return ()) when (oh' /= oh) (swapMVar (mvOutHook eng) oh' >> return ()) updateState new -- Input.hs inputManager :: (IO msg) -> Chan msg -> MVar [Hook st msg] -> MVar st -> IO () inputManager input outChan hookMV stMV= forever $ input >>= handleMsg where handleMsg msg = do hook <- readMVar hookMV s <- readMVar stMV new <- runHooks hook s msg case new of Just m -> writeChan outChan m Nothing -> return () -- Output.hs outputManager :: (result -> IO ()) -> Chan result -> MVar [Hook st result] -> MVar st -> IO () outputManager output msgChan hookMV stMV = forever $ do m <- readChan msgChan hook <- readMVar hookMV s <- readMVar stMV new <- runHooks hook s m case new of Just n -> output n Nothing -> return () -- Hooks.hs -- A hook is simply a mutation on the task. To order the hooks they all have -- priorities (lower value priorites happen first). For accounting and to -- remove old hooks there is a description field. data Hook st msg = Hk { hkFunc :: st -> msg -> IO (Maybe msg) , hkPriority :: Int , hkDescription :: String } instance Eq (Hook m s) where (Hk _ p d) == (Hk _ p' d') = p == p' && d == d' instance Ord (Hook a s) where (Hk _ p _) `compare` (Hk _ p' _) = p `compare` p' instance Show (Hook a s) where show (Hk _ p d) = d ++ " Priority = " ++ (show p) showsPrec _ (Hk _ p d) = (++) ("Hk { hkFunc = undefined, p = " ++ (show p) ++ " , hkDescription = " ++ d ++ " } ") data HookLoc = InputHook | PreMutateHook | PostMutateHook | OutputHook deriving (Eq, Ord, Show) runHooks :: [Hook st msg] -> st -> msg -> IO (Maybe msg) runHooks hooks st m = foldM apply (Just m) hooks where apply Nothing f = return Nothing apply (Just a) f = (hkFunc f st) a addInputHook :: Engine job result state -> Hook state job -> IO () addInputHook e h = atomically $ do readTVar (tvInHook e) >>= writeTVar (tvInHook e) . insert h addOutputHook :: Engine job result state -> Hook state result -> IO () addOutputHook e h = atomically $ do readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . insert h addPreMutateHook :: Engine job result state -> Hook state job -> IO () addPreMutateHook e h = atomically $ do readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . insert h addPostMutateHook :: Engine job result state -> Hook state result -> IO () addPostMutateHook e h = atomically $ do readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . insert h delInputHook :: Engine j r s -> String -> IO () delInputHook e s = atomically $ do readTVar (tvInHook e) >>= writeTVar (tvInHook e) . filter ( (/= s) . hkDescription) delPreMutateHook :: Engine j r s -> String -> IO () delPreMutateHook e s = atomically $ do readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . filter ( (/= s) . hkDescription) delPostMutateHook :: Engine j r s -> String -> IO () delPostMutateHook e s = atomically $ do readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . filter ( (/= s) . hkDescription) delOutputHook :: Engine j r s -> String -> IO () delOutputHook e s = atomically $ do readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . filter ( (/= s) . hkDescription) -- Inject.hs -- | Allows adding tasks that bypass the input hooks. injectPreMutator :: Engine j r s -> j -> IO () injectPreMutator eng i = writeChan (chan1 eng) i -- | Allows bypassing the mutator, meaning a 'result' can be produced without a task. -- This still hits the output hooks. injectPostMutator :: Engine j r s -> r -> IO () injectPostMutator eng o = writeChan (chan2 eng) o