-- | A gang consisting of a fixed number of threads that can run actions in parallel.
--   Good for constructing parallel test frameworks.
module BuildBox.Control.Gang
        ( Gang
        , GangState(..)

        , forkGangActions
        , joinGang
        , pauseGang
        , resumeGang
        , flushGang
        , killGang
        
        , getGangState
        , waitForGangState)
where
import Control.Concurrent
import Control.Exception
import Data.IORef
import qualified Data.Set       as Set
import Data.Set                 (Set)


-- Gang -----------------------------------------------------------------------
-- | Abstract gang of threads.
data Gang
        = Gang 
        { gangThreads           :: Int

        -- | Number of worker threads currently waiting.
        , gangThreadsAvailable  :: QSemN

        , gangState             :: IORef GangState

        , gangActionsRunning    :: IORef Int 

        , gangThreadsRunning    :: IORef (Set ThreadId) }


data GangState
        = -- | Gang is running and starting new actions.
          GangRunning

        -- | Gang may be running already started actions, 
        --   but no new ones are being started.
        | GangPaused

        -- | Gang is waiting for currently running actions to finish, 
        --   but not starting new ones.
        | GangFlushing

        -- | Gang is finished, all the actions have completed.
        | GangFinished
        
        -- | Gang was killed, all the threads are dead (or dying).
        | GangKilled
        deriving (Show, Eq)


-- | Get the state of a gang.
getGangState :: Gang -> IO GangState
getGangState gang
        = readIORef (gangState gang)


-- | Block until all actions have finished executing,
--   or the gang is killed.
joinGang :: Gang -> IO ()
joinGang !gang
 = do   
        -- Wait for all the threads to become available.
        waitQSemN (gangThreadsAvailable gang)
                  (gangThreads gang)

        -- See what state the gang is now in.
        state   <- readIORef (gangState gang)

        if state == GangFinished || state == GangKilled
         -- The gang is done.
         then return ()

         -- Hmm. We're holding all the threads but the gang is still
         -- running. Maybe the controller hasn't started the first
         -- one yet. Just put them all back and try again.
         -- We delay for a moment to allow the controller to run.
         else do
                threadDelay 1000

                signalQSemN (gangThreadsAvailable gang)
                            (gangThreads gang)

                joinGang gang


-- | Block until already started actions have completed, but don't start any more.
--   Gang state changes to `GangFlushing`.
flushGang :: Gang -> IO ()
flushGang !gang
 = do   atomicWriteIORef (gangState gang) GangFlushing
        waitForGangState gang GangFinished


-- | Pause a gang. Actions that have already been started continue to run, 
--   but no more will be started until a `resumeGang` command is issued.
--   Gang state changes to `GangPaused`.
pauseGang :: Gang -> IO ()
pauseGang !gang
 = do   -- Set the gang to paused mode.
        -- This will prevent any new threads from being started.
        atomicWriteIORef (gangState gang) GangPaused


-- | Resume a paused gang, which allows it to continue starting new actions.
--   If the gang was paused it now changes to `GangRunning`,
--   otherwise nothing happens.
resumeGang :: Gang -> IO ()
resumeGang !gang
 = atomicModifyIORef' (gangState gang) $ \state 
 -> do  case state of 
         GangPaused     -> (GangRunning, ())
         _              -> (state, ())


-- | Kill all the threads in a gang.
--   Gang stage changes to `GangKilled`.
killGang :: Gang -> IO ()
killGang !gang
 = do   
        atomicWriteIORef (gangState gang) GangKilled

        tids    <- readIORef (gangThreadsRunning gang) 
        mapM_ killThread $ Set.toList tids

        -- Signal that all the threads are available.
        --
        -- NOTE: There is a race here where a thread might have terminated
        -- cleanly and already bumped the QSemN just before were to add 
        -- to it, but we've killed the gang anyway so nothing more will
        -- be run.
        signalQSemN (gangThreadsAvailable gang) (length tids)


-- | Block until the gang is in the given state.
waitForGangState :: Gang -> GangState -> IO ()
waitForGangState !gang !waitState
 = do   
        -- Wait for all the threads to become available.
        waitQSemN (gangThreadsAvailable gang)
                  (gangThreads gang)

        state   <- readIORef (gangState gang)

        if state == waitState
         then return ()
         else do
                threadDelay 1000

                signalQSemN (gangThreadsAvailable gang)
                            (gangThreads gang)

                waitForGangState gang waitState


-- | Fork a new gang to run the given actions.
--   This function returns immediately, with the gang executing in the background.
--   Gang state starts as `GangRunning` then transitions to `GangFinished`.
--   To block until all the actions are finished use `joinGang`.
forkGangActions
        :: Int          -- ^ Number of worker threads in the gang \/ maximum number
                        --   of actions to execute concurrenty.
        -> [IO ()]      -- ^ Actions to run. They are started in-order, but may finish
                        --   out-of-order depending on the run time of the individual action.
        -> IO Gang

forkGangActions !threads !actions
 = do   semThreads              <- newQSemN threads
        refState                <- newIORef GangRunning
        refActionsRunning       <- newIORef 0
        refThreadsRunning       <- newIORef (Set.empty)
        let gang        
                = Gang
                { gangThreads           = threads
                , gangThreadsAvailable  = semThreads 
                , gangState             = refState
                , gangActionsRunning    = refActionsRunning 
                , gangThreadsRunning    = refThreadsRunning }

        _ <- forkIO $ gangLoop gang actions
        return gang
        

-- | Run actions on a gang.
gangLoop :: Gang -> [IO ()] -> IO ()

gangLoop !gang []
 = do   -- Signal that the gang is finished running actions.
        writeIORef (gangState gang) GangFinished

gangLoop !gang actions@(action:actionsRest)
 = do   
        -- Wait for a worker thread to become available.
        waitQSemN  (gangThreadsAvailable gang) 1
        
        -- See what state the gang is in.
        state   <- readIORef (gangState gang)

        case state of
         GangRunning 
          -> do -- Fork off the next action.
                -- We need to use the 'finally' to release the thread
                -- in the case that the action throws an exception.
                tid     <- forkOS 
                        $  finally action
                        $  do   -- Remove our ThreadId from the set of
                                -- running ThreadIds.
                                tid     <- myThreadId
                                atomicModifyIORef' (gangThreadsRunning gang)
                                        (\tids -> (Set.delete tid tids, ()))

                                -- Signal that the worker is now available.
                                signalQSemN (gangThreadsAvailable gang) 1
        
                -- Add the ThreadId of the freshly forked thread to the set
                -- of running ThreadIds. We'll need this set if we want to kill
                -- the gang.
                atomicModifyIORef' (gangThreadsRunning gang)
                        (\tids -> (Set.insert tid tids, ()))
        
                -- Handle the rest of the actions.
                gangLoop gang actionsRest

         -- TODO: avoid spinning on pause.
         GangPaused
          -> do signalQSemN (gangThreadsAvailable gang) 1
                threadDelay 1000
                gangLoop gang actions
                        
         GangFlushing   -> return ()
         GangFinished   -> return ()
         GangKilled     -> return ()