#define SEQ_IF_GANG_BUSY 1
#define TRACE_GANG 0
module Data.Array.Parallel.Unlifted.Distributed.Gang
( Gang
, seqGang
, forkGang
, gangSize
, gangIO, gangST
, traceGang, traceGangST )
where
import GHC.IO
import GHC.ST
import Control.Concurrent (forkOn)
import Control.Concurrent.MVar
import Control.Exception (assert)
import Control.Monad
#if TRACE_GANG
import GHC.Exts (traceEvent)
import System.Time ( ClockTime(..), getClockTime )
#endif
data Req
= ReqDo (Int -> IO ()) (MVar ())
| ReqShutdown (MVar ())
newReq :: (Int -> IO ()) -> IO Req
newReq p
= do mv <- newEmptyMVar
return $ ReqDo p mv
waitReq :: Req -> IO ()
waitReq req
= case req of
ReqDo _ varDone -> takeMVar varDone
ReqShutdown varDone -> takeMVar varDone
data Gang
= Gang !Int
[MVar Req]
(MVar Bool)
instance Show Gang where
showsPrec p (Gang n _ _)
= showString "<<"
. showsPrec p n
. showString " threads>>"
seqGang :: Gang -> Gang
seqGang (Gang n _ mv) = Gang n [] mv
gangWorker :: Int -> MVar Req -> IO ()
gangWorker threadId varReq
= do traceGang $ "Worker " ++ show threadId ++ " waiting for request."
req <- takeMVar varReq
case req of
ReqDo action varDone
-> do traceGang $ "Worker " ++ show threadId ++ " begin"
start <- getGangTime
action threadId
end <- getGangTime
traceGang $ "Worker " ++ show threadId
++ " end (" ++ diffTime start end ++ ")"
putMVar varDone ()
gangWorker threadId varReq
ReqShutdown varDone
-> do traceGang $ "Worker " ++ show threadId ++ " shutting down."
putMVar varDone ()
finaliseWorker :: MVar Req -> IO ()
finaliseWorker varReq
= do varDone <- newEmptyMVar
putMVar varReq (ReqShutdown varDone)
takeMVar varDone
return ()
forkGang :: Int -> IO Gang
forkGang n
= assert (n > 0)
$ do
mvs <- sequence . replicate n $ newEmptyMVar
mapM_ (\var -> addMVarFinalizer var (finaliseWorker var)) mvs
zipWithM_ forkOn [0..]
$ zipWith gangWorker [0 .. n1] mvs
busy <- newMVar False
return $ Gang n mvs busy
gangSize :: Gang -> Int
gangSize (Gang n _ _) = n
gangIO :: Gang
-> (Int -> IO ())
-> IO ()
gangIO (Gang n [] _) p
= mapM_ p [0 .. n1]
#if SEQ_IF_GANG_BUSY
gangIO (Gang n mvs busy) p
= do traceGang "gangIO: issuing work requests (SEQ_IF_GANG_BUSY)"
b <- swapMVar busy True
traceGang $ "gangIO: gang is currently " ++ (if b then "busy" else "idle")
if b
then mapM_ p [0 .. n1]
else do
parIO n mvs p
_ <- swapMVar busy False
return ()
#else
gangIO (Gang n mvs busy) p = parIO n mvs p
#endif
parIO :: Int
-> [MVar Req]
-> (Int -> IO ())
-> IO ()
parIO n mvs p
= do traceGang "parIO: begin"
start <- getGangTime
reqs <- sequence . replicate n $ newReq p
traceGang "parIO: issuing requests"
zipWithM_ putMVar mvs reqs
traceGang "parIO: waiting for requests to complete"
mapM_ waitReq reqs
end <- getGangTime
traceGang $ "parIO: end " ++ diffTime start end
gangST :: Gang -> (Int -> ST s ()) -> ST s ()
gangST g p = unsafeIOToST . gangIO g $ unsafeSTToIO . p
#if TRACE_GANG
getGangTime :: IO Integer
getGangTime
= do TOD sec pico <- getClockTime
return (pico + sec * 1000000000000)
diffTime :: Integer -> Integer -> String
diffTime x y = show (yx)
traceGang :: String -> IO ()
traceGang s
= do t <- getGangTime
traceEvent $ show t ++ " @ " ++ s
#else
getGangTime :: IO ()
getGangTime = return ()
diffTime :: () -> () -> String
diffTime _ _ = ""
traceGang :: String -> IO ()
traceGang _ = return ()
#endif
traceGangST :: String -> ST s ()
traceGangST s = unsafeIOToST (traceGang s)