module HSBencher.Internal.MeasureProcess
(measureProcess, measureProcessDBG,
selftimedHarvester, jittimeHarvester,
ghcProductivityHarvester, ghcAllocRateHarvester, ghcMemFootprintHarvester,
taggedLineHarvester
)
where
import qualified Control.Concurrent.Async as A
import Control.Concurrent (threadDelay)
import Control.Concurrent.Chan
import qualified Control.Exception as E
import Data.Time.Clock (getCurrentTime, diffUTCTime)
import Data.IORef
import Data.Monoid
import System.Exit
import System.Directory
import System.IO (hClose, stderr, hGetContents)
import System.Process (system, waitForProcess, getProcessExitCode, runInteractiveCommand, terminateProcess)
import System.Process (createProcess, CreateProcess(..), CmdSpec(..), StdStream(..), readProcess, ProcessHandle)
import System.Posix.Process (getProcessStatus)
import qualified System.IO.Streams as Strm
import qualified System.IO.Streams.Concurrent as Strm
import qualified System.IO.Streams.Process as Strm
import qualified System.IO.Streams.Combinators as Strm
import qualified Data.ByteString.Char8 as B
import System.Environment (getEnvironment)
import HSBencher.Types
import Debug.Trace
measureProcess :: LineHarvester
-> CommandDescr
-> IO SubProcess
measureProcess (LineHarvester harvest)
CommandDescr{command, envVars, timeout, workingDir, tolerateError} = do
origDir <- getCurrentDirectory
case workingDir of
Just d -> setCurrentDirectory d
Nothing -> return ()
curEnv <- getEnvironment
startTime <- getCurrentTime
(_inp,out,err,pid) <-
case command of
RawCommand exeFile cmdArgs -> Strm.runInteractiveProcess exeFile cmdArgs Nothing (Just$ envVars++curEnv)
ShellCommand str -> runInteractiveCommandWithEnv str (envVars++curEnv)
setCurrentDirectory origDir
out' <- Strm.map OutLine =<< Strm.lines out
err' <- Strm.map ErrLine =<< Strm.lines err
timeEvt <- case timeout of
Nothing -> Strm.nullInput
Just t -> Strm.map (\_ -> TimerFire) =<< timeOutStream t
merged0 <- Strm.concurrentMerge [out',err']
merged1 <- reifyEOS merged0
merged2 <- Strm.map (\x -> case x of
Nothing -> ProcessClosed
Just y -> y) merged1
merged3 <- Strm.concurrentMerge [merged2, timeEvt]
relay_out <- newChan
relay_err <- newChan
process_out <- Strm.chanToInput relay_out
process_err <- Strm.chanToInput relay_err
let
loop :: RunResult -> IO RunResult
loop resultAcc = do
x <- Strm.read merged3
case x of
Just ProcessClosed -> do
writeChan relay_err Nothing
writeChan relay_out Nothing
code <- waitForProcess pid
endtime <- getCurrentTime
let retTime =
if realtime resultAcc == realtime emptyRunResult
then
let d = diffUTCTime endtime startTime in
return$ resultAcc { realtime = fromRational$ toRational d }
else return resultAcc
case code of
ExitSuccess -> retTime
ExitFailure c | tolerateError -> retTime
| otherwise -> return (ExitError c)
Just TimerFire -> do
B.hPutStrLn stderr $ " [hsbencher] Benchmark run timed out. Killing process."
terminateProcess pid
B.hPutStrLn stderr $ " [hsbencher] Cleaning up io-streams."
writeChan relay_err Nothing
writeChan relay_out Nothing
E.catch (dumpRest merged3) $ \ (exn::E.SomeException) ->
B.hPutStrLn stderr $ " [hsbencher] ! Got an error while cleaning up: " `B.append` B.pack(show exn)
B.hPutStrLn stderr $ " [hsbencher] Done with cleanup."
return RunTimeOut
Just (ErrLine errLine) -> do
writeChan relay_err (Just errLine)
loop $ fst (harvest errLine) resultAcc
Just (OutLine outLine) -> do
writeChan relay_out (Just outLine)
loop $ fst (harvest outLine) resultAcc
Nothing -> do
let err = "benchmark.hs: Internal error! This should not happen."
B.hPutStrLn stderr err
writeChan relay_err (Just err)
error (B.unpack err)
fut <- A.async (loop emptyRunResult)
return$ SubProcess {wait=A.wait fut, process_out, process_err}
measureProcessDBG :: LineHarvester
-> CommandDescr
-> IO ([B.ByteString], RunResult)
measureProcessDBG (LineHarvester harvest)
CommandDescr{command, envVars, timeout, workingDir, tolerateError} = do
curEnv <- getEnvironment
startTime <- getCurrentTime
(Just hin, Just hout, Just herr, ph) <- createProcess
CreateProcess {
cmdspec = command,
env = Just (envVars++curEnv),
std_in = CreatePipe,
std_out = CreatePipe,
std_err = CreatePipe,
cwd = workingDir,
close_fds = False,
create_group = False,
delegate_ctlc = False
}
out <- B.hGetContents hout
err <- B.hGetContents herr
code <- waitForProcess ph
endtime <- getCurrentTime
let outl, errl :: [B.ByteString]
outl = B.lines out
errl = B.lines err
tagged = map (B.append " [stderr] ") errl ++
map (B.append " [stdout] ") outl
result = foldr (fst . harvest) emptyRunResult (errl++outl)
let retTime =
if realtime result /= realtime emptyRunResult
then return (tagged,result)
else
let d = diffUTCTime endtime startTime in
return (tagged, result { realtime = fromRational$ toRational d })
case code of
ExitSuccess -> retTime
ExitFailure c | tolerateError -> retTime
| otherwise -> return (tagged, ExitError c)
dumpRest :: Strm.InputStream a -> IO ()
dumpRest strm = do
x <- Strm.read strm
case x of
Nothing -> return ()
Just _ -> dumpRest strm
data ProcessEvt = ErrLine B.ByteString
| OutLine B.ByteString
| ProcessClosed
| TimerFire
deriving (Show,Eq,Read)
selftimedHarvester :: LineHarvester
selftimedHarvester = taggedLineHarvester "SELFTIMED" (\d r -> r{realtime=d})
jittimeHarvester :: LineHarvester
jittimeHarvester = taggedLineHarvester "JITTIME" (\d r -> r{jittime=Just d})
taggedLineHarvester :: Read a => B.ByteString -> (a -> RunResult -> RunResult) -> LineHarvester
taggedLineHarvester tag stickit = LineHarvester $ \ ln ->
let fail = (id, False) in
case B.words ln of
[] -> fail
hd:tl | hd == tag || hd == (tag `B.append` ":") ->
case tl of
[time] ->
case reads (B.unpack time) of
(dbl,_):_ -> (stickit dbl, True)
_ -> error$ "[taggedLineHarvester] Error: line tagged with "++B.unpack tag++", but couldn't parse number: "++B.unpack ln
_ -> error$ "[taggedLineHarvester] Error: tagged line followed by more than one token: "++B.unpack ln
_ -> fail
ghcProductivityHarvester :: LineHarvester
ghcProductivityHarvester =
(taggedLineHarvester "PRODUCTIVITY" (\d r -> r{productivity=Just d})) `orHarvest`
(LineHarvester $ \ ln ->
let nope = (id,False) in
case words (B.unpack ln) of
[] -> nope
("Productivity": prod: "of": "total": "user," : _) ->
case reads (filter (/= '%') prod) of
((prodN,_):_) -> (\r -> r{productivity=Just prodN}, True)
_ -> nope
_ -> nope)
ghcAllocRateHarvester :: LineHarvester
ghcAllocRateHarvester =
(LineHarvester $ \ ln ->
let nope = (id,False) in
case words (B.unpack ln) of
[] -> nope
("Alloc":"rate": rate: "bytes":"per":_) ->
case reads (filter (/= ',') rate) of
((n,_):_) -> (\r -> r{allocRate=Just n}, True)
_ -> nope
_ -> nope)
ghcMemFootprintHarvester :: LineHarvester
ghcMemFootprintHarvester =
(LineHarvester $ \ ln ->
let nope = (id,False) in
case words (B.unpack ln) of
[] -> nope
(sz:"bytes":"maximum":"residency":_) ->
case reads (filter (/= ',') sz) of
((n,_):_) -> (\r -> r{memFootprint=Just n}, True)
_ -> nope
_ -> nope)
timeOutStream :: Double -> IO (Strm.InputStream ())
timeOutStream time = do
s1 <- Strm.makeInputStream $ do
threadDelay (round$ time * 1000 * 1000)
return$ Just ()
Strm.take 1 s1
orMaybe :: Maybe a -> Maybe a -> Maybe a
orMaybe Nothing x = x
orMaybe x@(Just _) _ = x
reifyEOS :: Strm.InputStream a -> IO (Strm.InputStream (Maybe a))
reifyEOS ins =
do flag <- newIORef True
Strm.makeInputStream $ do
x <- Strm.read ins
flg <- readIORef flag
case x of
Just y -> return (Just (Just y))
Nothing | flg -> do writeIORef flag False
return (Just Nothing)
| otherwise -> return Nothing
runInteractiveCommandWithEnv :: String
-> [(String,String)]
-> IO (Strm.OutputStream B.ByteString,
Strm.InputStream B.ByteString,
Strm.InputStream B.ByteString,
ProcessHandle)
runInteractiveCommandWithEnv scmd env = do
(Just hin, Just hout, Just herr, ph) <- createProcess
CreateProcess {
cmdspec = ShellCommand scmd,
env = Just env,
std_in = CreatePipe,
std_out = CreatePipe,
std_err = CreatePipe,
cwd = Nothing,
close_fds = False,
create_group = False,
delegate_ctlc = False
}
sIn <- Strm.handleToOutputStream hin >>=
Strm.atEndOfOutput (hClose hin) >>=
Strm.lockingOutputStream
sOut <- Strm.handleToInputStream hout >>=
Strm.atEndOfInput (hClose hout) >>=
Strm.lockingInputStream
sErr <- Strm.handleToInputStream herr >>=
Strm.atEndOfInput (hClose herr) >>=
Strm.lockingInputStream
return (sIn, sOut, sErr, ph)