-- | A gang consisting of a fixed number of threads that can run actions in parallel. -- Good for constructing parallel test frameworks. module BuildBox.Control.Gang ( Gang , GangState(..) , forkGangActions , joinGang , pauseGang , resumeGang , flushGang , killGang , getGangState , waitForGangState) where import Control.Concurrent import Data.IORef import qualified Data.Set as Set import Data.Set (Set) -- Gang ----------------------------------------------------------------------- -- | Abstract gang of threads. data Gang = Gang { gangThreads :: Int , gangThreadsAvailable :: QSemN , gangState :: IORef GangState , gangActionsRunning :: IORef Int , gangThreadsRunning :: IORef (Set ThreadId) } data GangState = -- | Gang is running and starting new actions. GangRunning -- | Gang may be running already started actions, -- but no new ones are being started. | GangPaused -- | Gang is waiting for currently running actions to finish, -- but not starting new ones. | GangFlushing -- | Gang is finished, all the actions have completed. | GangFinished -- | Gang was killed, all the threads are dead (or dying). | GangKilled deriving (Show, Eq) -- | Get the state of a gang. getGangState :: Gang -> IO GangState getGangState gang = readIORef (gangState gang) -- | Block until all actions have finished executing, -- or the gang is killed. joinGang :: Gang -> IO () joinGang gang = do state <- readIORef (gangState gang) if state == GangFinished || state == GangKilled then return () else do threadDelay 1000 joinGang gang -- | Block until already started actions have completed, but don't start any more. -- Gang state changes to `GangFlushing`. flushGang :: Gang -> IO () flushGang gang = do writeIORef (gangState gang) GangFlushing waitForGangState gang GangFinished -- | Pause a gang. Actions that have already been started continue to run, -- but no more will be started until a `resumeGang` command is issued. -- Gang state changes to `GangPaused`. pauseGang :: Gang -> IO () pauseGang gang = writeIORef (gangState gang) GangPaused -- | Resume a paused gang, which allows it to continue starting new actions. -- Gang state changes to `GangRunning`. resumeGang :: Gang -> IO () resumeGang gang = writeIORef (gangState gang) GangRunning -- | Kill all the threads in a gang. -- Gang stage changes to `GangKilled`. killGang :: Gang -> IO () killGang gang = do writeIORef (gangState gang) GangKilled tids <- readIORef (gangThreadsRunning gang) mapM_ killThread $ Set.toList tids -- | Block until the gang is in the given state. waitForGangState :: Gang -> GangState -> IO () waitForGangState gang waitState = do state <- readIORef (gangState gang) if state == waitState then return () else do threadDelay 1000 waitForGangState gang waitState -- | Fork a new gang to run the given actions. -- This function returns immediately, with the gang executing in the background. -- Gang state starts as `GangRunning` then transitions to `GangFinished`. -- To block until all the actions are finished use `joinGang`. forkGangActions :: Int -- ^ Number of worker threads in the gang \/ maximum number -- of actions to execute concurrenty. -> [IO ()] -- ^ Actions to run. They are started in-order, but may finish -- out-of-order depending on the run time of the individual action. -> IO Gang forkGangActions threads actions = do semThreads <- newQSemN threads refState <- newIORef GangRunning refActionsRunning <- newIORef 0 refThreadsRunning <- newIORef (Set.empty) let gang = Gang { gangThreads = threads , gangThreadsAvailable = semThreads , gangState = refState , gangActionsRunning = refActionsRunning , gangThreadsRunning = refThreadsRunning } _ <- forkIO $ gangLoop gang actions return gang -- | Run actions on a gang. gangLoop :: Gang -> [IO ()] -> IO () gangLoop gang [] = do -- Wait for all the threads to finish. waitQSemN (gangThreadsAvailable gang) (gangThreads gang) -- Signal that the gang is finished running actions. writeIORef (gangState gang) GangFinished gangLoop gang actions@(action:actionsRest) = do state <- readIORef (gangState gang) case state of GangRunning -> do -- Wait for a worker thread to become available. waitQSemN (gangThreadsAvailable gang) 1 gangLoop_withWorker gang action actionsRest GangPaused -> do threadDelay 1000 gangLoop gang actions GangFlushing -> do actionsRunning <- readIORef (gangActionsRunning gang) if actionsRunning == 0 then writeIORef (gangState gang) GangFinished else do threadDelay 1000 gangLoop gang [] GangFinished -> return () GangKilled -> return () -- we have an available worker gangLoop_withWorker :: Gang -> IO () -> [IO ()] -> IO () gangLoop_withWorker gang action actionsRest = do -- See if we're supposed to be starting actions or not. state <- readIORef (gangState gang) case state of GangRunning -> do -- fork off the first action tid <- forkOS $ do -- run the action (and wait for it to complete) action -- signal that a new worker is available signalQSemN (gangThreadsAvailable gang) 1 -- remove our ThreadId from the set of running ThreadIds. tid <- myThreadId atomicModifyIORef (gangThreadsRunning gang) (\tids -> (Set.delete tid tids, ())) -- Add the ThreadId of the freshly forked thread to the set -- of running ThreadIds. We'll need this set if we want to kill -- the gang. atomicModifyIORef (gangThreadsRunning gang) (\tids -> (Set.insert tid tids, ())) -- handle the rest of the actions. gangLoop gang actionsRest -- someone issued flush or pause command while we -- were waiting for a worker, so don't start next action. _ -> do signalQSemN (gangThreadsAvailable gang) 1 gangLoop gang (action:actionsRest)