module HSBencher.Internal.MeasureProcess
(measureProcess, measureProcessDBG,
selftimedHarvester, jittimeHarvester,
ghcProductivityHarvester, ghcAllocRateHarvester, ghcMemFootprintHarvester,
taggedLineHarvester,
setCPUAffinity
)
where
import qualified Control.Concurrent.Async as A
import Control.Concurrent (threadDelay)
import Control.Concurrent.Chan
import qualified Control.Exception as E
import Data.Time.Clock (getCurrentTime, diffUTCTime)
import Data.IORef
import System.Exit
import System.Directory
import System.IO (hClose, stderr, hPutStrLn)
import System.Process (system, waitForProcess, terminateProcess)
import System.Process (createProcess, CreateProcess(..), CmdSpec(..), StdStream(..), readProcess, ProcessHandle)
import System.Posix.Process (getProcessID)
import qualified System.IO.Streams as Strm
import qualified System.IO.Streams.Concurrent as Strm
import qualified Data.ByteString.Char8 as B
import qualified Data.List as L
import qualified Data.Set as S
import System.Environment (getEnvironment)
import HSBencher.Types
import Prelude hiding (fail)
measureProcess :: Maybe (Int,CPUAffinity)
-> LineHarvester
-> CommandDescr
-> IO SubProcess
measureProcess (Just aff) hrv descr =
do setCPUAffinity aff
measureProcess Nothing hrv descr
measureProcess Nothing (LineHarvester harvest)
CommandDescr{command, envVars, timeout, workingDir, tolerateError} = do
origDir <- getCurrentDirectory
case workingDir of
Just d -> setCurrentDirectory d
Nothing -> return ()
curEnv <- getEnvironment
startTime <- getCurrentTime
(_inp,out,errout,pid) <-
case command of
RawCommand exeFile cmdArgs -> Strm.runInteractiveProcess exeFile cmdArgs Nothing (Just$ envVars++curEnv)
ShellCommand str -> runInteractiveCommandWithEnv str (envVars++curEnv)
setCurrentDirectory origDir
out' <- Strm.map OutLine =<< Strm.lines out
err' <- Strm.map ErrLine =<< Strm.lines errout
timeEvt <- case timeout of
Nothing -> Strm.nullInput
Just t -> Strm.map (\_ -> TimerFire) =<< timeOutStream t
merged0 <- Strm.concurrentMerge [out',err']
merged1 <- reifyEOS merged0
merged2 <- Strm.map (\x -> case x of
Nothing -> ProcessClosed
Just y -> y) merged1
merged3 <- Strm.concurrentMerge [merged2, timeEvt]
relay_out <- newChan
relay_err <- newChan
process_out <- Strm.chanToInput relay_out
process_err <- Strm.chanToInput relay_err
let
loop :: RunResult -> IO RunResult
loop resultAcc = do
x <- Strm.read merged3
case x of
Just ProcessClosed -> do
writeChan relay_err Nothing
writeChan relay_out Nothing
code <- waitForProcess pid
endtime <- getCurrentTime
let retTime =
if realtime resultAcc == realtime emptyRunResult
then
let d = diffUTCTime endtime startTime in
return$ resultAcc { realtime = fromRational$ toRational d }
else return resultAcc
case code of
ExitSuccess -> retTime
ExitFailure c | tolerateError -> retTime
| otherwise -> return (ExitError c)
Just TimerFire -> do
B.hPutStrLn stderr $ " [hsbencher] Benchmark run timed out. Killing process."
terminateProcess pid
B.hPutStrLn stderr $ " [hsbencher] Cleaning up io-streams."
writeChan relay_err Nothing
writeChan relay_out Nothing
E.catch (dumpRest merged3) $ \ (exn::E.SomeException) ->
B.hPutStrLn stderr $ " [hsbencher] ! Got an error while cleaning up: " `B.append` B.pack(show exn)
B.hPutStrLn stderr $ " [hsbencher] Done with cleanup."
return RunTimeOut
Just (ErrLine errLine) -> do
writeChan relay_err (Just errLine)
loop $ fst (harvest errLine) resultAcc
Just (OutLine outLine) -> do
writeChan relay_out (Just outLine)
loop $ fst (harvest outLine) resultAcc
Nothing -> do
let err = "benchmark.hs: Internal error! This should not happen."
B.hPutStrLn stderr err
writeChan relay_err (Just err)
error (B.unpack err)
fut <- A.async (loop emptyRunResult)
return$ SubProcess {wait=A.wait fut, process_out, process_err}
measureProcessDBG :: Maybe (Int,CPUAffinity)
-> LineHarvester
-> CommandDescr
-> IO ([B.ByteString], RunResult)
measureProcessDBG (Just aff) hrv descr =
do setCPUAffinity aff
measureProcessDBG Nothing hrv descr
measureProcessDBG Nothing (LineHarvester harvest)
CommandDescr{command, envVars, timeout=_, workingDir, tolerateError} = do
curEnv <- getEnvironment
startTime <- getCurrentTime
(Just _hin, Just hout, Just herr, ph) <- createProcess
CreateProcess {
cmdspec = command,
env = Just (envVars++curEnv),
std_in = CreatePipe,
std_out = CreatePipe,
std_err = CreatePipe,
cwd = workingDir,
close_fds = False,
create_group = False,
delegate_ctlc = False
}
out <- B.hGetContents hout
err <- B.hGetContents herr
code <- waitForProcess ph
endtime <- getCurrentTime
let outl, errl :: [B.ByteString]
outl = B.lines out
errl = B.lines err
tagged = map (B.append " [stderr] ") errl ++
map (B.append " [stdout] ") outl
result = foldr (fst . harvest) emptyRunResult (errl++outl)
let retTime =
if realtime result /= realtime emptyRunResult
then return (tagged,result)
else
let d = diffUTCTime endtime startTime in
return (tagged, result { realtime = fromRational$ toRational d })
case code of
ExitSuccess -> retTime
ExitFailure c | tolerateError -> retTime
| otherwise -> return (tagged, ExitError c)
dumpRest :: Strm.InputStream a -> IO ()
dumpRest strm = do
x <- Strm.read strm
case x of
Nothing -> return ()
Just _ -> dumpRest strm
data ProcessEvt = ErrLine B.ByteString
| OutLine B.ByteString
| ProcessClosed
| TimerFire
deriving (Show,Eq,Read)
setCPUAffinity :: (Int,CPUAffinity) -> IO ()
setCPUAffinity (numthreads,aff) = do
pid <- getProcessID
dump <- readProcess "numactl" ["--hardware"] []
let filt0 = filter (L.isInfixOf "cpus:") (lines dump)
cpusets = [ rst | ("node":_:"cpus:":rst) <- map words filt0 ]
numDomains = length cpusets
allCPUs = concat cpusets
hPutStrLn stderr $ " [hsbencher] numactl found "++show numDomains++
" NUMA domains: "++show (map (map readInt) cpusets)
let assertLen n l
| length l == n = l
| otherwise = error $ "setCPUAffinity: requested "++show n
++" cpus, only got "++show (length l)
subset = case aff of
Default -> concat cpusets
Packed -> assertLen numthreads $ take numthreads (concat cpusets)
SpreadOut -> assertLen numthreads $
let (q,r) = numthreads `quotRem` numDomains
frsts = concat $ map (take q) cpusets
leftover = S.toList (S.difference (S.fromList allCPUs) (S.fromList frsts))
in frsts ++ take r leftover
case subset of
[] -> error "setCPUAffinity: internal error: zero cpus selected."
_ -> do
let cmd = ("taskset -pc "++L.intercalate "," subset++" "++show pid)
hPutStrLn stderr $ " [hsbencher] Attempting to set CPU affinity: "++cmd
cde <- system cmd
case cde of
ExitSuccess -> return ()
ExitFailure c -> error $ "setCPUAffinity: taskset command returned error code: "++show c
readInt :: String -> Int
readInt = read
selftimedHarvester :: LineHarvester
selftimedHarvester = taggedLineHarvester "SELFTIMED" (\d r -> r{realtime=d})
jittimeHarvester :: LineHarvester
jittimeHarvester = taggedLineHarvester "JITTIME" (\d r -> r{jittime=Just d})
taggedLineHarvester :: Read a => B.ByteString -> (a -> RunResult -> RunResult) -> LineHarvester
taggedLineHarvester tag stickit = LineHarvester $ \ ln ->
let fail = (id, False) in
case B.words ln of
[] -> fail
hd:tl | hd == tag || hd == (tag `B.append` ":") ->
case tl of
[time] ->
case reads (B.unpack time) of
(dbl,_):_ -> (stickit dbl, True)
_ -> error$ "[taggedLineHarvester] Error: line tagged with "++B.unpack tag++", but couldn't parse number: "++B.unpack ln
_ -> error$ "[taggedLineHarvester] Error: tagged line followed by more than one token: "++B.unpack ln
_ -> fail
ghcProductivityHarvester :: LineHarvester
ghcProductivityHarvester =
(taggedLineHarvester "PRODUCTIVITY" (\d r -> r{productivity=Just d})) `orHarvest`
(LineHarvester $ \ ln ->
let nope = (id,False) in
case words (B.unpack ln) of
[] -> nope
("Productivity": prod: "of": "total": "user," : _) ->
case reads (filter (/= '%') prod) of
((prodN,_):_) -> (\r -> r{productivity=Just prodN}, True)
_ -> nope
_ -> nope)
ghcAllocRateHarvester :: LineHarvester
ghcAllocRateHarvester =
(LineHarvester $ \ ln ->
let nope = (id,False) in
case words (B.unpack ln) of
[] -> nope
("Alloc":"rate": rate: "bytes":"per":_) ->
case reads (filter (/= ',') rate) of
((n,_):_) -> (\r -> r{allocRate=Just n}, True)
_ -> nope
_ -> nope)
ghcMemFootprintHarvester :: LineHarvester
ghcMemFootprintHarvester =
(LineHarvester $ \ ln ->
let nope = (id,False) in
case words (B.unpack ln) of
[] -> nope
(sz:"bytes":"maximum":"residency":_) ->
case reads (filter (/= ',') sz) of
((n,_):_) -> (\r -> r{memFootprint=Just n}, True)
_ -> nope
_ -> nope)
timeOutStream :: Double -> IO (Strm.InputStream ())
timeOutStream time = do
s1 <- Strm.makeInputStream $ do
threadDelay (round$ time * 1000 * 1000)
return$ Just ()
Strm.take 1 s1
reifyEOS :: Strm.InputStream a -> IO (Strm.InputStream (Maybe a))
reifyEOS ins =
do flag <- newIORef True
Strm.makeInputStream $ do
x <- Strm.read ins
flg <- readIORef flag
case x of
Just y -> return (Just (Just y))
Nothing | flg -> do writeIORef flag False
return (Just Nothing)
| otherwise -> return Nothing
runInteractiveCommandWithEnv :: String
-> [(String,String)]
-> IO (Strm.OutputStream B.ByteString,
Strm.InputStream B.ByteString,
Strm.InputStream B.ByteString,
ProcessHandle)
runInteractiveCommandWithEnv scmd env = do
(Just hin, Just hout, Just herr, ph) <- createProcess
CreateProcess {
cmdspec = ShellCommand scmd,
env = Just env,
std_in = CreatePipe,
std_out = CreatePipe,
std_err = CreatePipe,
cwd = Nothing,
close_fds = False,
create_group = False,
delegate_ctlc = False
}
sIn <- Strm.handleToOutputStream hin >>=
Strm.atEndOfOutput (hClose hin) >>=
Strm.lockingOutputStream
sOut <- Strm.handleToInputStream hout >>=
Strm.atEndOfInput (hClose hout) >>=
Strm.lockingInputStream
sErr <- Strm.handleToInputStream herr >>=
Strm.atEndOfInput (hClose herr) >>=
Strm.lockingInputStream
return (sIn, sOut, sErr, ph)