module Profiling.Heap.Read
(
readProfile
, LoadProgress
, ProfilingStop
, readProfileAsync
, ProfileReader
, ProfilingType(..)
, ProfilingCommand
, ProfilingInfo
, profile
, profileCallback
) where
import Control.Applicative
import Control.Arrow
import Control.Monad
import Control.Monad.Fix
import Control.Concurrent
import Control.Exception (SomeException, catch)
import Prelude hiding (catch)
import Data.IORef
import System.Directory
import System.FilePath
import System.IO
import System.Process
import qualified Data.ByteString.Char8 as S
import Data.ByteString.Internal as SI
import qualified Data.Attoparsec.Char8 as A
import Data.List
import Data.Maybe
import qualified Data.IntMap as IM
import Data.Map (Map)
import qualified Data.Map as T
import Profiling.Heap.Types
import Network
import Profiling.Heap.Network
import Data.Time.LocalTime (getZonedTime)
import Data.Time.Format (formatTime)
import System.Locale (defaultTimeLocale)
type Trie v = Map ByteString v
readProfile :: FilePath -> IO (Maybe Profile)
readProfile file = flip catch (const (return Nothing) :: SomeException -> IO (Maybe a)) $ do
hdl <- openFile file ReadMode
let parse !stime !prof = do
stop <- hIsEOF hdl
if not stop then do
(!stime', !prof') <- accumProfile stime prof <$> S.hGetLine hdl
parse stime' prof'
else return prof
prof <- parse Nothing emptyProfile
return $ if null (prJob prof) then Nothing else Just prof
type LoadProgress = IO (Either Double Profile)
type ProfilingStop = IO ()
readProfileAsync :: FilePath -> IO (LoadProgress,ProfilingStop)
readProfileAsync file = do
progress <- newIORef (Left 0)
hdl <- openFile file ReadMode
totalSize <- fromIntegral <$> hFileSize hdl
let parse stime prof size = do
stop <- hIsEOF hdl
if not stop then do
line <- S.hGetLine hdl
let (stime',prof') = accumProfile stime prof line
size' = size + S.length line + 1
writeIORef progress . Left $! size'
prof' `seq` parse stime' prof' size'
else writeIORef progress (Right prof)
tid <- forkIO $ parse Nothing emptyProfile 0
return ( left (\s -> fromIntegral s/totalSize) <$> readIORef progress
, killThread tid >> writeIORef progress (Right emptyProfile)
)
type ProfileReader = IO Profile
data ProfilingType loc rem = Local { local :: loc }
| Remote { remote :: rem }
type ProfilingCommand = ProfilingType CreateProcess String
type ProfilingInfo = ProfilingType ProcessHandle Handle
profile :: ProfilingCommand -> IO (Maybe (ProfileReader,ProfilingStop,ProfilingInfo))
profile prog = do
let getCmd p = case cmdspec p of
ShellCommand cmd -> cmd
RawCommand prg args -> intercalate " " (prg:args)
zt <- getZonedTime
ref <- newIORef emptyProfile
{ prJob = case prog of
Local desc -> getCmd desc
Remote addr -> addr
, prDate = formatTime defaultTimeLocale "%F %H:%M:%S %Z" zt
}
(fmap.fmap) (\(stop,info) -> (readIORef ref,stop,info)) $ profileCallback prog $ \pkg -> do
prof <- readIORef ref
case pkg of
SinkSample t smp -> writeIORef ref $ prof
{ prSamples = (t,smp) : prSamples prof }
SinkId ccid ccname -> writeIORef ref $ prof
{ prNames = IM.insert ccid ccname (prNames prof) }
_ -> return ()
profileCallback :: ProfilingCommand -> ProfileSink -> IO (Maybe (ProfilingStop,ProfilingInfo))
profileCallback (Local prog) sink = do
dir <- getCurrentDirectory
let
execPath (ShellCommand cmd) = takeWhile (/=' ') cmd
execPath (RawCommand path _) = path
hpPath <- canonicalizePath $ fromMaybe dir (cwd prog) ++
'/' : (takeFileName . execPath . cmdspec) prog ++ ".hp"
catch (removeFile hpPath) (const (return ()) :: SomeException -> IO ())
(_,_,_,phdl) <- createProcess prog
maybeHpFile <- tryRepeatedly (openFile hpPath ReadMode) 50 10000
case maybeHpFile of
Nothing -> return Nothing
Just hpFile -> do
tid <- forkIO $ do
let pass buf idmap smp = do
case S.elemIndex '\n' buf of
Just len -> do
let (line,rest) = S.splitAt len buf
next = pass (S.drop 1 rest)
case parseHpLine line of
BeginSample _ -> next idmap []
EndSample t -> do
when (not (null smp)) $ sink (SinkSample t smp)
next idmap []
Cost ccname cost -> do
let (newid,ccid,idmap') = addCCId idmap ccname
when newid $ sink (SinkId ccid ccname)
next idmap' ((ccid,cost):smp)
_ -> next idmap smp
Nothing -> do
slaveCode <- getProcessExitCode phdl
if slaveCode == Nothing then do
eof <- hIsEOF hpFile
if eof then do
threadDelay 100000
pass buf idmap smp
else do
newChars <- S.hGetNonBlocking hpFile 0x10000
pass (S.append buf newChars) idmap smp
else sink SinkStop
pass S.empty T.empty []
return (Just (profileStop tid sink,Local phdl))
profileCallback (Remote server) sink = do
let (addr,_:port) = span (/=':') server
portNum :: Int
portNum = read port
hdl <- connectTo addr ((PortNumber . fromIntegral) portNum)
hSetBuffering hdl LineBuffering
tid <- forkIO . fix $ \readLoop -> do
msg <- catch (readMsg <$> hGetLine hdl)
((const . return . Just . Stream $ SinkStop) :: SomeException -> IO (Maybe Message))
case msg >>= getStream of
Just profSmp -> do
sink profSmp
when (profSmp /= SinkStop) readLoop
Nothing -> readLoop
return (Just (profileStop tid sink,Remote hdl))
profileStop :: ThreadId -> ProfileSink -> IO ()
profileStop tid sink = do
killThread tid
_ <- forkIO (sink SinkStop)
return ()
tryRepeatedly :: IO a -> Int -> Int -> IO (Maybe a)
tryRepeatedly act n d | n < 1 = return Nothing
| otherwise = catch (Just <$> act) retry
where retry e = do let _ = e :: SomeException
threadDelay d
tryRepeatedly act (n1) d
data ParseResult = Unknown
| Job !String
| Date !String
| BeginSample !Time
| EndSample !Time
| Cost !CostCentreName !Cost
parseHpLine :: ByteString -> ParseResult
parseHpLine !line
| not (S.null cost) = Cost ccname $ case S.readInt (S.tail cost) of
Just (n, _) -> fromIntegral n
Nothing -> error "parseHpLine.readInt"
| S.null sparam = Unknown
| cmd == sBEGIN_SAMPLE = BeginSample $ case A.parseOnly A.double . S.tail $ sparam of
Right d -> d
_ -> error "parseHpLine: parse BeginSample double failed"
| cmd == sEND_SAMPLE = EndSample $ case A.parseOnly A.double . S.tail $ sparam of
Right d -> d
_ -> error "parseHpLine: parse BeginSample double failed"
| cmd == sJOB = Job (read param)
| cmd == sDATE = Date (read param)
| otherwise = Unknown
where (ccname,cost) = S.span (/='\t') line
(cmd,sparam) = S.span (/=' ') line
param = S.unpack (S.tail sparam)
sJOB, sDATE, sBEGIN_SAMPLE, sEND_SAMPLE :: ByteString
sJOB = S.pack "JOB"
sDATE = S.pack "DATE"
sBEGIN_SAMPLE = S.pack "BEGIN_SAMPLE"
sEND_SAMPLE = S.pack "END_SAMPLE"
accumProfile :: Maybe Time -> Profile -> ByteString -> (Maybe Time,Profile)
accumProfile !time !prof !line = case parseHpLine line of
Job s -> (Nothing,prof { prJob = s })
Date s -> (Nothing,prof { prDate = s })
BeginSample t -> (Just t,prof)
EndSample _ -> (Nothing,prof)
Cost ccname cost -> let (!newid,!ccid,!pnsi') = addCCId (prNamesInv prof) ccname
t = fromJust time
smps = prSamples prof
smps' = case smps of
[] -> [(t,[(ccid,cost)])]
smps0@((t',ccs):sss)
| t == t' -> (t', (ccid,cost):ccs):sss
| otherwise -> (t, [(ccid,cost)]):smps0
in (time,
prof
{ prSamples = smps'
, prNames = if newid then IM.insert ccid ccname (prNames prof) else prNames prof
, prNamesInv = pnsi'
})
Unknown -> (Nothing,prof)
addCCId :: Trie CostCentreId -> CostCentreName -> (Bool, CostCentreId, Trie CostCentreId)
addCCId !idmap !ccname = if ccid /= T.size idmap then (False,ccid,idmap)
else (True,ccid,T.insert ccname ccid idmap)
where !ccid = fromMaybe (T.size idmap) (T.lookup ccname idmap)
_test1 :: IO (Maybe (ProfilingStop,ProfilingInfo))
_test1 = do
dir <- getCurrentDirectory
profileCallback (Local (shell (dir++"/test/tester")) { cwd = Just (dir++"/test") }) print
_test2 :: IO ()
_test2 = do
dir <- getCurrentDirectory
Just (reader,_,_) <- profile (Local (shell (dir++"/test/tester")) { cwd = Just (dir++"/test") })
replicateM_ 5 $ do
prof <- reader
print prof
threadDelay 1000000
_test3 :: IO Profile
_test3 = do
dir <- getCurrentDirectory
fromJust <$> (readProfile $ dir ++ "/test/example.hp")