module Profiling.Heap.Read
(
readProfile
, LoadProgress
, ProfilingStop
, readProfileAsync
, ProfileReader
, ProfilingType(..)
, ProfilingCommand
, ProfilingInfo
, profile
, profileCallback
) where
import Control.Applicative
import Control.Arrow
import Control.Monad
import Control.Monad.Fix
import Control.Concurrent
import Data.IORef
import System.Directory
import System.FilePath
import System.IO
import System.Process
import qualified Data.ByteString.Char8 as S
import Data.ByteString.Internal as SI
import Data.List
import Data.Maybe
import qualified Data.IntMap as IM
import Data.Trie (Trie)
import qualified Data.Trie as T
import Profiling.Heap.Types
import Network
import Profiling.Heap.Network
import Data.Time.LocalTime (getZonedTime)
import Data.Time.Format (formatTime)
import System.Locale (defaultTimeLocale)
readProfile :: FilePath -> IO (Maybe Profile)
readProfile file = flip catch (const (return Nothing)) $ do
hdl <- openFile file ReadMode
let parse stime prof = do
stop <- hIsEOF hdl
if not stop then do
(stime',prof') <- accumProfile stime prof <$> S.hGetLine hdl
parse stime' $! prof'
else return prof
prof <- parse Nothing emptyProfile
return $ if null (prJob prof) then Nothing else Just prof
type LoadProgress = IO (Either Double Profile)
type ProfilingStop = IO ()
readProfileAsync :: FilePath -> IO (LoadProgress,ProfilingStop)
readProfileAsync file = do
progress <- newIORef (Left 0)
hdl <- openFile file ReadMode
totalSize <- fromIntegral <$> hFileSize hdl
let parse stime prof size = do
stop <- hIsEOF hdl
if not stop then do
line <- S.hGetLine hdl
let (stime',prof') = accumProfile stime prof line
size' = size + S.length line + 1
writeIORef progress . Left $! size'
prof' `seq` parse stime' prof' size'
else writeIORef progress (Right prof)
tid <- forkIO $ parse Nothing emptyProfile 0
return ( left (\s -> fromIntegral s/totalSize) <$> readIORef progress
, killThread tid >> writeIORef progress (Right emptyProfile)
)
type ProfileReader = IO Profile
data ProfilingType loc rem = Local { local :: loc }
| Remote { remote :: rem }
type ProfilingCommand = ProfilingType CreateProcess String
type ProfilingInfo = ProfilingType ProcessHandle Handle
profile :: ProfilingCommand -> IO (Maybe (ProfileReader,ProfilingStop,ProfilingInfo))
profile prog = do
let getCmd p = case cmdspec p of
ShellCommand cmd -> cmd
RawCommand prg args -> intercalate " " (prg:args)
zt <- getZonedTime
ref <- newIORef emptyProfile
{ prJob = case prog of
Local desc -> getCmd desc
Remote addr -> addr
, prDate = formatTime defaultTimeLocale "%F %H:%M:%S %Z" zt
}
(fmap.fmap) (\(stop,info) -> (readIORef ref,stop,info)) $ profileCallback prog $ \pkg -> do
prof <- readIORef ref
case pkg of
SinkSample t smp -> writeIORef ref $ prof
{ prSamples = (t,smp) : prSamples prof }
SinkId ccid ccname -> writeIORef ref $ prof
{ prNames = IM.insert ccid ccname (prNames prof) }
_ -> return ()
profileCallback :: ProfilingCommand -> ProfileSink -> IO (Maybe (ProfilingStop,ProfilingInfo))
profileCallback (Local prog) sink = do
dir <- getCurrentDirectory
let
execPath (ShellCommand cmd) = takeWhile (/=' ') cmd
execPath (RawCommand path _) = path
hpPath <- canonicalizePath $ fromMaybe dir (cwd prog) ++
'/' : (takeFileName . execPath . cmdspec) prog ++ ".hp"
catch (removeFile hpPath) (const (return ()))
(_,_,_,phdl) <- createProcess prog
maybeHpFile <- tryRepeatedly (openFile hpPath ReadMode) 50 10000
case maybeHpFile of
Nothing -> return Nothing
Just hpFile -> do
tid <- forkIO $ do
let pass buf idmap smp = do
case S.elemIndex '\n' buf of
Just len -> do
let (line,rest) = S.splitAt len buf
next = pass (S.drop 1 rest)
case parseHpLine line of
BeginSample _ -> next idmap []
EndSample t -> do
when (not (null smp)) $ sink (SinkSample t smp)
next idmap []
Cost ccname cost -> do
let (newid,ccid,idmap') = addCCId idmap ccname
when newid $ sink (SinkId ccid ccname)
next idmap' ((ccid,cost):smp)
_ -> next idmap smp
Nothing -> do
slaveCode <- getProcessExitCode phdl
if slaveCode == Nothing then do
eof <- hIsEOF hpFile
if eof then do
threadDelay 100000
pass buf idmap smp
else do
newChars <- S.hGetNonBlocking hpFile 0x10000
pass (S.append buf newChars) idmap smp
else sink SinkStop
pass S.empty T.empty []
return (Just (profileStop tid sink,Local phdl))
profileCallback (Remote server) sink = do
let (addr,_:port) = span (/=':') server
portNum :: Int
portNum = read port
hdl <- connectTo addr ((PortNumber . fromIntegral) portNum)
hSetBuffering hdl LineBuffering
tid <- forkIO . fix $ \readLoop -> do
msg <- catch (readMsg <$> hGetLine hdl) (const . return . Just . Stream $ SinkStop)
case msg >>= getStream of
Just profSmp -> do
sink profSmp
when (profSmp /= SinkStop) readLoop
Nothing -> readLoop
return (Just (profileStop tid sink,Remote hdl))
profileStop :: ThreadId -> ProfileSink -> IO ()
profileStop tid sink = do
killThread tid
_ <- forkIO (sink SinkStop)
return ()
tryRepeatedly :: IO a -> Int -> Int -> IO (Maybe a)
tryRepeatedly act n d | n < 1 = return Nothing
| otherwise = catch (Just <$> act) (const retry)
where retry = do threadDelay d
tryRepeatedly act (n1) d
data ParseResult = Unknown
| Job String
| Date String
| BeginSample Time
| EndSample Time
| Cost CostCentreName Cost
parseHpLine :: ByteString -> ParseResult
parseHpLine line
| S.null cost = head ([val | (key,val) <- results, key == cmd] ++ [Unknown])
| otherwise = Cost ccname (read . S.unpack . S.tail $ cost)
where (ccname,cost) = S.span (/='\t') line
(cmd,sparam) = S.span (/=' ') line
param = S.unpack (S.tail sparam)
results = if S.null sparam then [] else
[(S.pack "JOB",Job (read param)),
(S.pack "DATE",Date (read param)),
(S.pack "BEGIN_SAMPLE",BeginSample (read param)),
(S.pack "END_SAMPLE",EndSample (read param))]
accumProfile :: Maybe Time -> Profile -> ByteString -> (Maybe Time,Profile)
accumProfile time prof line = case parseHpLine line of
Job s -> (Nothing,prof { prJob = s })
Date s -> (Nothing,prof { prDate = s })
BeginSample t -> (Just t,prof)
EndSample _ -> (Nothing,prof)
Cost ccname cost -> let (newid,ccid,pnsi') = addCCId (prNamesInv prof) ccname
t = fromJust time
smps = prSamples prof
smps' | null smps = [(t,[(ccid,cost)])]
| otherwise = if t == fst (head smps) then
(fmap ((ccid,cost):) (head smps)) : tail smps
else (t,[(ccid,cost)]) : smps
in (time,
prof
{ prSamples = smps'
, prNames = if newid then IM.insert ccid ccname (prNames prof) else prNames prof
, prNamesInv = pnsi'
})
Unknown -> (Nothing,prof)
addCCId :: Trie CostCentreId -> CostCentreName -> (Bool, CostCentreId, Trie CostCentreId)
addCCId idmap ccname = if ccid /= T.size idmap then (False,ccid,idmap)
else (True,ccid,T.insert ccname ccid idmap)
where ccid = fromMaybe (T.size idmap) (T.lookup ccname idmap)
_test1 :: IO (Maybe (ProfilingStop,ProfilingInfo))
_test1 = do
dir <- getCurrentDirectory
profileCallback (Local (shell (dir++"/test/tester")) { cwd = Just (dir++"/test") }) print
_test2 :: IO ()
_test2 = do
dir <- getCurrentDirectory
Just (reader,_,_) <- profile (Local (shell (dir++"/test/tester")) { cwd = Just (dir++"/test") })
replicateM_ 5 $ do
prof <- reader
print prof
threadDelay 1000000
_test3 :: IO Profile
_test3 = do
dir <- getCurrentDirectory
fromJust <$> (readProfile $ dir ++ "/test/example.hp")