module HSBencher.MeasureProcess
(measureProcess,
selftimedHarvester, ghcProductivityHarvester,
taggedLineHarvester, nullHarvester
)
where
import qualified Control.Concurrent.Async as A
import Control.Concurrent (threadDelay)
import Control.Concurrent.Chan
import qualified Control.Exception as E
import Data.Time.Clock (getCurrentTime, diffUTCTime)
import Data.IORef
import System.Exit
import System.Directory
import System.IO (hClose, stderr)
import System.Process (system, waitForProcess, getProcessExitCode, runInteractiveCommand, terminateProcess,
createProcess, CreateProcess(..), CmdSpec(..), StdStream(..), readProcess, ProcessHandle)
import System.Posix.Process (getProcessStatus)
import qualified System.IO.Streams as Strm
import qualified System.IO.Streams.Concurrent as Strm
import qualified System.IO.Streams.Process as Strm
import qualified System.IO.Streams.Combinators as Strm
import qualified Data.ByteString.Char8 as B
import System.Environment (getEnvironment)
import HSBencher.Types
measureProcess :: LineHarvester -> LineHarvester -> CommandDescr -> IO SubProcess
measureProcess (LineHarvester checkTiming) (LineHarvester checkProd)
CommandDescr{command, envVars, timeout, workingDir} = do
origDir <- getCurrentDirectory
case workingDir of
Just d -> setCurrentDirectory d
Nothing -> return ()
curEnv <- getEnvironment
startTime <- getCurrentTime
(_inp,out,err,pid) <-
case command of
RawCommand exeFile cmdArgs -> Strm.runInteractiveProcess exeFile cmdArgs Nothing (Just$ envVars++curEnv)
ShellCommand str -> runInteractiveCommandWithEnv str (envVars++curEnv)
setCurrentDirectory origDir
out' <- Strm.map OutLine =<< Strm.lines out
err' <- Strm.map ErrLine =<< Strm.lines err
timeEvt <- case timeout of
Nothing -> Strm.nullInput
Just t -> Strm.map (\_ -> TimerFire) =<< timeOutStream t
merged0 <- Strm.concurrentMerge [out',err']
merged1 <- reifyEOS merged0
merged2 <- Strm.map (\x -> case x of
Nothing -> ProcessClosed
Just y -> y) merged1
merged3 <- Strm.concurrentMerge [merged2, timeEvt]
relay_out <- newChan
relay_err <- newChan
process_out <- Strm.chanToInput relay_out
process_err <- Strm.chanToInput relay_err
let
loop time prod = do
x <- Strm.read merged3
case x of
Just ProcessClosed -> do
writeChan relay_err Nothing
writeChan relay_out Nothing
code <- waitForProcess pid
endtime <- getCurrentTime
case code of
ExitSuccess -> do
tm <- case time of
Nothing -> let d = diffUTCTime endtime startTime in
return$ fromRational$ toRational d
Just t -> return t
return (RunCompleted {realtime=tm, productivity=prod})
ExitFailure c -> return (ExitError c)
Just TimerFire -> do
B.hPutStrLn stderr $ " [hsbencher] Benchmark run timed out. Killing process."
terminateProcess pid
B.hPutStrLn stderr $ " [hsbencher] Cleaning up io-streams."
writeChan relay_err Nothing
writeChan relay_out Nothing
E.catch (dumpRest merged3) $ \ (exn::E.SomeException) ->
B.hPutStrLn stderr $ " [hsbencher] ! Got an error while cleaning up: " `B.append` B.pack(show exn)
B.hPutStrLn stderr $ " [hsbencher] Done with cleanup."
return RunTimeOut
Just (ErrLine errLine) -> do
writeChan relay_err (Just errLine)
loop time (prod `orMaybe` checkProd errLine)
Just (OutLine outLine) -> do
writeChan relay_out (Just outLine)
loop (time `orMaybe` checkTiming outLine)
(prod `orMaybe` checkProd outLine)
Nothing -> error "benchmark.hs: Internal error! This should not happen."
fut <- A.async (loop Nothing Nothing)
return$ SubProcess {wait=A.wait fut, process_out, process_err}
dumpRest :: Strm.InputStream a -> IO ()
dumpRest strm = do
x <- Strm.read strm
case x of
Nothing -> return ()
Just _ -> dumpRest strm
data ProcessEvt = ErrLine B.ByteString
| OutLine B.ByteString
| ProcessClosed
| TimerFire
deriving (Show,Eq,Read)
nullHarvester :: LineHarvester
nullHarvester = LineHarvester $ \_ -> Nothing
selftimedHarvester :: LineHarvester
selftimedHarvester = taggedLineHarvester "SELFTIMED"
taggedLineHarvester :: B.ByteString -> LineHarvester
taggedLineHarvester tag = LineHarvester $ \ ln ->
case B.words ln of
[] -> Nothing
hd:tl | hd == tag || hd == (tag `B.append` ":") ->
case tl of
[time] ->
case reads (B.unpack time) of
(dbl,_):_ -> Just dbl
_ -> error$ "Error parsing number in SELFTIMED line: "++B.unpack ln
_ -> Nothing
ghcProductivityHarvester :: LineHarvester
ghcProductivityHarvester = LineHarvester $ \ ln ->
case words (B.unpack ln) of
[] -> Nothing
[p, time] | p == "PRODUCTIVITY" || p == "PRODUCTIVITY:" ->
case reads time of
(dbl,_):_ -> Just dbl
_ -> error$ "Error parsing number in PRODUCTIVITY line: "++B.unpack ln
["GC","time",gc,"(",total,"elapsed)"] ->
case (reads gc, reads total) of
((gcD,_):_,(totalD,_):_) -> Just $
if totalD == 0.0
then 100.0
else (100 (gcD / totalD * 100))
_ -> error$ "checkGCTime: Error parsing number in MUT time line: "++B.unpack ln
_ -> Nothing
timeOutStream :: Double -> IO (Strm.InputStream ())
timeOutStream time = do
s1 <- Strm.makeInputStream $ do
threadDelay (round$ time * 1000 * 1000)
return$ Just ()
Strm.take 1 s1
orMaybe :: Maybe a -> Maybe a -> Maybe a
orMaybe Nothing x = x
orMaybe x@(Just _) _ = x
reifyEOS :: Strm.InputStream a -> IO (Strm.InputStream (Maybe a))
reifyEOS ins =
do flag <- newIORef True
Strm.makeInputStream $ do
x <- Strm.read ins
flg <- readIORef flag
case x of
Just y -> return (Just (Just y))
Nothing | flg -> do writeIORef flag False
return (Just Nothing)
| otherwise -> return Nothing
runInteractiveCommandWithEnv :: String
-> [(String,String)]
-> IO (Strm.OutputStream B.ByteString,
Strm.InputStream B.ByteString,
Strm.InputStream B.ByteString,
ProcessHandle)
runInteractiveCommandWithEnv scmd env = do
(Just hin, Just hout, Just herr, ph) <- createProcess
CreateProcess {
cmdspec = ShellCommand scmd,
env = Just env,
std_in = CreatePipe,
std_out = CreatePipe,
std_err = CreatePipe,
cwd = Nothing,
close_fds = False,
create_group = False
}
sIn <- Strm.handleToOutputStream hin >>=
Strm.atEndOfOutput (hClose hin) >>=
Strm.lockingOutputStream
sOut <- Strm.handleToInputStream hout >>=
Strm.atEndOfInput (hClose hout) >>=
Strm.lockingInputStream
sErr <- Strm.handleToInputStream herr >>=
Strm.atEndOfInput (hClose herr) >>=
Strm.lockingInputStream
return (sIn, sOut, sErr, ph)