#define SEQ_IF_GANG_BUSY 1
#define TRACE_GANG 1
module Data.Array.Parallel.Unlifted.Distributed.Primitive.Gang
( Gang
, Workload (..)
, seqGang
, forkGang
, gangSize
, gangIO, gangST)
where
import GHC.IO
import GHC.ST
import Control.Concurrent (forkOn)
import Control.Concurrent.MVar
import Control.Exception (assert)
import Control.Monad
#if TRACE_GANG
import Debug.Trace (traceEventIO)
import System.Time ( ClockTime(..), getClockTime )
#endif
data Req
= ReqDo (Int -> IO ()) (MVar ())
| ReqShutdown (MVar ())
newReq :: (Int -> IO ()) -> IO Req
newReq p
= do mv <- newEmptyMVar
return $ ReqDo p mv
waitReq :: Req -> IO ()
waitReq req
= case req of
ReqDo _ varDone -> takeMVar varDone
ReqShutdown varDone -> takeMVar varDone
data Gang
= Gang !Int
[MVar Req]
(MVar Bool)
instance Show Gang where
showsPrec p (Gang n _ _)
= showString "<<"
. showsPrec p n
. showString " threads>>"
seqGang :: Gang -> Gang
seqGang (Gang n _ mv) = Gang n [] mv
gangWorker :: Int -> MVar Req -> IO ()
gangWorker threadId varReq
= do traceWorker threadId $ "ready."
req <- takeMVar varReq
case req of
ReqDo action varDone
-> do traceWorker threadId $ " begin."
start <- getGangTime
action threadId
end <- getGangTime
traceWorker threadId $ " end (" ++ diffTime start end ++ ")."
putMVar varDone ()
gangWorker threadId varReq
ReqShutdown varDone
-> do traceWorker threadId $ " shutting down."
putMVar varDone ()
traceWorker :: Int -> String -> IO ()
traceWorker threadId str
= traceGang
$ "Worker " ++ show threadId
++ " "
++ replicate (threadId * 10) ' '
++ str
finaliseWorker :: MVar Req -> IO ()
finaliseWorker varReq
= do varDone <- newEmptyMVar
putMVar varReq (ReqShutdown varDone)
takeMVar varDone
return ()
forkGang :: Int -> IO Gang
forkGang n
= assert (n > 0)
$ do
mvs <- sequence . replicate n $ newEmptyMVar
mapM_ (\var -> addMVarFinalizer var (finaliseWorker var)) mvs
zipWithM_ forkOn [0..]
$ zipWith gangWorker [0 .. n1] mvs
busy <- newMVar False
return $ Gang n mvs busy
gangSize :: Gang -> Int
gangSize (Gang n _ _) = n
data Workload
= WorkUnknown
| WorkCopy Int
deriving (Eq, Show)
workloadIsSmall :: Workload -> Bool
workloadIsSmall ww
= case ww of
WorkUnknown -> False
WorkCopy bytes -> bytes < 1000
gangIO :: Gang
-> String
-> Workload
-> (Int -> IO ())
-> IO ()
gangIO (Gang n [] _) _what _workload p
= mapM_ p [0 .. n1]
#if SEQ_IF_GANG_BUSY
gangIO (Gang n mvs busy) what workload p
= do let !small = workloadIsSmall workload
if small
then do
traceGang $ "Issuing small " ++ what
mapM_ p [0 .. n1]
else do
isBusy <- swapMVar busy True
if isBusy
then do
traceGang $ "WARNING: Gang was already busy, running sequentially: " ++ what
mapM_ p [0 .. n1]
else do
traceGangSplit $ "Issuing par " ++ what
parIO what n mvs p
_ <- swapMVar busy False
return ()
#else
gangIO (Gang n mvs busy) what _workload p
= parIO n mvs p
#endif
parIO :: String
-> Int
-> [MVar Req]
-> (Int -> IO ())
-> IO ()
parIO what n mvs p
= do start <- getGangTime
reqs <- sequence . replicate n $ newReq p
zipWithM_ putMVar mvs reqs
traceGang $ "Running."
mapM_ waitReq reqs
end <- getGangTime
traceGangSplit $ "Complete par " ++ what ++ " in " ++ diffTime start end ++ "us."
gangST :: Gang -> String -> Workload -> (Int -> ST s ()) -> ST s ()
gangST gang what workload p
= unsafeIOToST
$ gangIO gang what workload
$ unsafeSTToIO . p
#if TRACE_GANG
getGangTime :: IO Integer
getGangTime
= do TOD sec pico <- getClockTime
let !micro = pico `div` 1000000
return (micro + sec * 1000000)
diffTime :: Integer -> Integer -> String
diffTime x y = show (yx)
traceGang :: String -> IO ()
traceGang s
= do traceEventIO $ "GANG " ++ s
traceGangSplit :: String -> IO ()
traceGangSplit s
= do let xs = chunks 500 s
let max' = show $ length xs
mapM_ (\(x,i) -> traceEventIO
$ "GANG[" ++ show i ++ "/" ++ max' ++ "] " ++ x)
(xs `zip` [1 :: Int ..])
where
chunks _ [] = []
chunks len str
= let (f,r) = splitAt len str
in f : chunks len r
#else
getGangTime :: IO ()
getGangTime = return ()
diffTime :: () -> () -> String
diffTime _ _ = ""
traceGang :: String -> IO ()
traceGang _ = return ()
traceGangSplit :: String -> IO ()
traceGangSplit _ = return ()
#endif