{-# LANGUAGE CPP #-} -- | Gang Primitives. module Data.Array.Repa.Eval.Gang ( theGang , Gang, forkGang, gangSize, gangIO, gangST) where import GHC.IO import GHC.ST import GHC.Conc (forkOn) import Control.Concurrent.MVar import Control.Exception (assert) import Control.Monad import GHC.Conc (numCapabilities) import System.IO -- TheGang -------------------------------------------------------------------- -- | This globally shared gang is auto-initialised at startup and shared by all -- Repa computations. -- -- In a data parallel setting, it does not help to have multiple gangs -- running at the same time. This is because a single data parallel -- computation should already be able to keep all threads busy. If we had -- multiple gangs running at the same time, then the system as a whole would -- run slower as the gangs would contend for cache and thrash the scheduler. -- -- If, due to laziness or otherwise, you try to start multiple parallel -- Repa computations at the same time, then you will get the following -- warning on stderr at runtime: -- -- @Data.Array.Repa: Performing nested parallel computation sequentially. -- You've probably called the 'compute' or 'copy' function while another -- instance was already running. This can happen if the second version -- was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure that -- each array is fully evaluated before you 'compute' the next one. -- @ -- theGang :: Gang {-# NOINLINE theGang #-} theGang = unsafePerformIO $ do let caps = numCapabilities forkGang caps -- Requests ------------------------------------------------------------------- -- | The 'Req' type encapsulates work requests for individual members of a gang. data Req -- | Instruct the worker to run the given action. = ReqDo (Int -> IO ()) -- | Tell the worker that we're shutting the gang down. -- The worker should signal that it's receieved the request by -- writing to its result var before returning to the caller (forkGang). | ReqShutdown -- Gang ----------------------------------------------------------------------- -- | A 'Gang' is a group of threads that execute arbitrary work requests. data Gang = Gang { -- | Number of threads in the gang. _gangThreads :: !Int -- | Workers listen for requests on these vars. , _gangRequestVars :: [MVar Req] -- | Workers put their results in these vars. , _gangResultVars :: [MVar ()] -- | Indicates that the gang is busy. , _gangBusy :: MVar Bool } instance Show Gang where showsPrec p (Gang n _ _ _) = showString "<<" . showsPrec p n . showString " threads>>" -- | O(1). Yield the number of threads in the 'Gang'. gangSize :: Gang -> Int gangSize (Gang n _ _ _) = n -- | Fork a 'Gang' with the given number of threads (at least 1). forkGang :: Int -> IO Gang forkGang n = assert (n > 0) $ do -- Create the vars we'll use to issue work requests. mvsRequest <- sequence $ replicate n $ newEmptyMVar -- Create the vars we'll use to signal that threads are done. mvsDone <- sequence $ replicate n $ newEmptyMVar -- Add finalisers so we can shut the workers down cleanly if they -- become unreachable. zipWithM_ (\varReq varDone -> mkWeakMVar varReq (finaliseWorker varReq varDone)) mvsRequest mvsDone -- Create all the worker threads zipWithM_ forkOn [0..] $ zipWith3 gangWorker [0 .. n-1] mvsRequest mvsDone -- The gang is currently idle. busy <- newMVar False return $ Gang n mvsRequest mvsDone busy -- | The worker thread of a 'Gang'. -- The threads blocks on the MVar waiting for a work request. gangWorker :: Int -> MVar Req -> MVar () -> IO () gangWorker threadId varRequest varDone = do -- Wait for a request req <- takeMVar varRequest case req of ReqDo action -> do -- Run the action we were given. action threadId -- Signal that the action is complete. putMVar varDone () -- Wait for more requests. gangWorker threadId varRequest varDone ReqShutdown -> putMVar varDone () -- | Finaliser for worker threads. -- We want to shutdown the corresponding thread when it's MVar becomes -- unreachable. -- Without this Repa programs can complain about "Blocked indefinitely -- on an MVar" because worker threads are still blocked on the request -- MVars when the program ends. Whether the finalizer is called or not -- is very racey. It happens about 1 in 10 runs when for the -- repa-edgedetect benchmark, and less often with the others. -- -- We're relying on the comment in System.Mem.Weak that says -- "If there are no other threads to run, the runtime system will -- check for runnablefinalizers before declaring the system to be -- deadlocked." -- -- If we were creating and destroying the gang cleanly we wouldn't need -- this, but theGang is created with a top-level unsafePerformIO. -- Hacks beget hacks beget hacks... -- finaliseWorker :: MVar Req -> MVar () -> IO () finaliseWorker varReq varDone = do putMVar varReq ReqShutdown takeMVar varDone return () -- | Issue work requests for the 'Gang' and wait until they complete. -- -- If the gang is already busy then print a warning to `stderr` and just -- run the actions sequentially in the requesting thread. gangIO :: Gang -> (Int -> IO ()) -> IO () {-# NOINLINE gangIO #-} gangIO gang@(Gang _ _ _ busy) action = do b <- swapMVar busy True if b then do seqIO gang action else do parIO gang action _ <- swapMVar busy False return () -- | Run an action on the gang sequentially. seqIO :: Gang -> (Int -> IO ()) -> IO () seqIO (Gang n _ _ _) action = do hPutStr stderr $ unlines [ "Data.Array.Repa: Performing nested parallel computation sequentially." , " You've probably called the 'compute' or 'copy' function while another" , " instance was already running. This can happen if the second version" , " was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure" , " that each array is fully evaluated before you 'compute' the next one." , "" ] mapM_ action [0 .. n-1] -- | Run an action on the gang in parallel. parIO :: Gang -> (Int -> IO ()) -> IO () parIO (Gang _ mvsRequest mvsResult _) action = do -- Send requests to all the threads. mapM_ (\v -> putMVar v (ReqDo action)) mvsRequest -- Wait for all the requests to complete. mapM_ takeMVar mvsResult -- | Same as 'gangIO' but in the 'ST' monad. gangST :: Gang -> (Int -> ST s ()) -> ST s () gangST g p = unsafeIOToST . gangIO g $ unsafeSTToIO . p