{-# LANGUAGE LambdaCase, ScopedTypeVariables, GADTs, DeriveDataTypeable, Trustworthy #-} module Control.CUtils.ThreadPool (Pool, addToPoolMulti, newPool, stopPool_, -- ** Global thread pools globalPool, -- ** Compatibility shims ThreadPool(..), Interruptible(..), NoPool(..), BoxedThreadPool(..)) where import Control.Exception import Control.Concurrent import Data.Data import Data.Array import Data.Foldable import Data.IORef import Data.List.Extras.Argmax import Data.Maybe import Control.Monad import Control.Monad.Identity import Control.Monad.Loops --import Control.Monad.Random import Control.CUtils.BoundedQueue import System.IO.Unsafe import System.IO import Prelude hiding (mapM_) data Instruction = NextTask(IO()) | S deriving (Typeable) data Worker= Worker{ instructions :: !(BoundedQueue Instruction), counter :: !(IORef Int) } deriving (Typeable) instance Data Instruction instance Data Worker newtype Pool = Pool { workers_ :: Array Int Worker } deriving (Typeable, Data) addToWorker :: Worker -> IO t -> IO() {-# INLINE addToWorker #-} addToWorker mv mnd = mask_$ do atomicModifyIORef'(counter mv) (flip(,) ().succ) writeRB(instructions mv) $!NextTask(void mnd) addToPoolMulti :: Pool -> IO t -> IO() {-# INLINABLE addToPoolMulti #-} addToPoolMulti (Pool ls) _ | rangeSize(bounds ls)<=0 = throwIO$ErrorCall"addToPoolMulti: pool is empty" addToPoolMulti (Pool ls) mnd = do let ls' = toList ls ls2 <- mapM(readIORef.counter) ls'--tbd let worker = fst.argmin snd.zip ls'$ls2 addToWorker worker mnd -- It's desirable for performance to have each worker thread occupied -- as often as possible with work. The simplest way of achieving this -- is to have a single work queue and have the various threads take -- work off it greedily (or do a workstealing approach) -- , but this introduces a central point of contention. -- An observation: pretty good load balancing can be achieved with an -- imprecise estimate of which threads are occupied; then in principle -- the source of contention is eliminated; the caller of 'addToPoolMulti' -- selects the worker thread to which to assign the work. (The estimate -- is imprecise because the code is reading it with limited synchronization, -- so the estimate is sometimes out of date.) newWorker :: IO Worker newWorker = do rb <- newRB 10000 ref <- newIORef 0 let worker = Worker rb ref _ <- forkIO(loop worker) return$!worker where -- Task processing loop loop worker = whileM_(readRB(instructions worker) >>= \ case NextTask mnd -> do -- For every task taken off the work queue, decrement the counter. atomicModifyIORef'(counter worker) (flip(,) ().pred) -- The default exception handling policy for worker threads is to -- print the exception to stderr. catch mnd(\(ex::SomeException) -> hPrint stderr ex) return$!True S -> return$!False) -- Serve a request to terminate the thread pool $return() newPool :: Int -> IO Pool newPool n= liftM(Pool. listArray(0,n-1)) (replicateM n newWorker) stopWorker :: Worker -> IO() stopWorker mv = writeRB(instructions mv)$!S stopPool_ :: Pool -> IO() -- | Stop each worker thread in turn by sending it a message. stopPool_ = mapM_ stopWorker.workers_ -------------------------------------------- -- Global thread pools globalPool :: Pool {-# NOINLINE globalPool #-} -- 'Pool' type is abstract, ensuring that the implementation can choose the number of workers -- in the pool based on various considerations without breaking referential transparency. globalPool = unsafePerformIO(getNumCapabilities >>= newPool) -------------------------------------------- -- Compatibility shims -- | Thread pools support some standard operations.... class ThreadPool pool where addToPool :: pool -> IO t -> IO() class Interruptible pool where stopPool :: pool -> IO() instance ThreadPool Pool where addToPool = addToPoolMulti instance Interruptible Pool where stopPool = stopPool_ -- | Use if you don't want to use a thread pool. The implementation of 'addToPool' spawns a green thread. data NoPool = NoPool deriving (Typeable, Data) instance ThreadPool NoPool where addToPool _ = void.forkIO.void data BoxedThreadPool where BoxedThreadPool :: (ThreadPool pool) => pool -> BoxedThreadPool instance ThreadPool BoxedThreadPool where addToPool(BoxedThreadPool pool) = addToPool pool {-# SPECIALIZE addToPool :: Pool -> IO t -> IO() #-} {-# SPECIALIZE addToPool :: NoPool -> IO t -> IO() #-} {-# SPECIALIZE addToPool :: BoxedThreadPool -> IO t -> IO() #-}