module Data.Array.Accelerate.Debug.Monitoring (
beginMonitoring,
initAccMetrics,
Processor(..),
withProcessor, addProcessorTime,
didAllocateBytes,
didEvictLRU,
didMajorGC,
) where
#ifdef ACCELERATE_MONITORING
import Data.Atomic ( Atomic )
import qualified Data.Atomic as Atomic
import System.Metrics
import System.Metrics.Counter ( Counter )
import System.Metrics.Gauge ( Gauge )
import qualified System.Metrics.Counter as Counter
import qualified System.Metrics.Gauge as Gauge
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad
import Data.IORef
import Data.Text ( Text )
import Data.Time.Clock
import System.IO.Unsafe
import System.Remote.Monitoring
import Text.Printf
import qualified Data.HashMap.Strict as Map
#endif
import Data.Int
import Prelude
beginMonitoring :: IO ()
#ifdef ACCELERATE_MONITORING
beginMonitoring = do
store <- initAccMetrics
registerGcMetrics store
r <- withAsync (forkServerWith store "localhost" 8000 >> threadDelay 10000) waitCatch
case r of
Right _ -> printf "EKG monitor started at: http://localhost:8000\n"
Left _ -> printf "Failed to start EKG monitor\n"
#else
beginMonitoring = return ()
#endif
#ifndef ACCELERATE_MONITORING
initAccMetrics :: IO a
initAccMetrics = error "Data.Array.Accelerate: Monitoring is disabled. Reinstall package 'accelerate' with '-fekg' to enable it."
#else
initAccMetrics :: IO Store
initAccMetrics = do
store <- newStore
registerRate "acc.load.llvm_native" (calculateProcessorLoad _active_ns_llvm_native) store
registerRate "acc.load.llvm_ptx" (calculateProcessorLoad _active_ns_llvm_ptx) store
registerRate "acc.load.cuda" (calculateProcessorLoad _active_ns_cuda) store
registerCounter "acc.gc.bytes_allocated" (Counter.read _bytesAllocated) store
registerCounter "acc.gc.bytes_copied_to_remote" (Counter.read _bytesCopiedToRemote) store
registerCounter "acc.gc.bytes_copied_from_remote" (Counter.read _bytesCopiedFromRemote) store
registerGauge "acc.gc.current_bytes_active" (Gauge.read _bytesActive) store
registerGauge "acc.gc.current_bytes_nursery" (Gauge.read _bytesNursery) store
registerCounter "acc.gc.num_gcs" (Counter.read _numMajorGC) store
registerCounter "acc.gc.num_lru_evict" (Counter.read _numEvictions) store
return store
registerRate :: Text -> (IORef EMAState -> IO Int64) -> Store -> IO ()
registerRate name sample store = do
now <- getCurrentTime
st <- newIORef (ES now 0 0)
registerGroup (Map.singleton name Gauge) (sample st) store
#endif
data Processor = Native | PTX | CUDA
withProcessor :: Processor -> IO a -> IO a
#ifndef ACCELERATE_MONITORING
withProcessor _ = id
#else
withProcessor Native = withProcessor' _active_ns_llvm_native
withProcessor PTX = withProcessor' _active_ns_llvm_ptx
withProcessor CUDA = withProcessor' _active_ns_cuda
withProcessor' :: Atomic -> IO a -> IO a
withProcessor' var action = do
wall0 <- getCurrentTime
!r <- action
wall1 <- getCurrentTime
addProcessorTime' var (realToFrac (diffUTCTime wall1 wall0))
return r
#endif
addProcessorTime :: Processor -> Double -> IO ()
#ifndef ACCELERATE_MONITORING
addProcessorTime _ _ = return ()
#else
addProcessorTime Native = addProcessorTime' _active_ns_llvm_native
addProcessorTime PTX = addProcessorTime' _active_ns_llvm_ptx
addProcessorTime CUDA = addProcessorTime' _active_ns_cuda
addProcessorTime' :: Atomic -> Double -> IO ()
addProcessorTime' var secs =
let ns = round (secs * 1.0E9)
in void $ Atomic.add var ns
#endif
didAllocateBytes :: Int64 -> IO ()
didEvictLRU :: IO ()
didMajorGC :: IO ()
#ifndef ACCELERATE_MONITORING
didAllocateBytes _ = return ()
didEvictLRU = return ()
didMajorGC = return ()
#else
didAllocateBytes n = do
Counter.add _bytesAllocated n
Gauge.add _bytesActive n
didEvictLRU = Counter.inc _numEvictions
didMajorGC = do
Counter.inc _numMajorGC
Gauge.set _bytesNursery 0
#endif
#ifdef ACCELERATE_MONITORING
data EMAState = ES
{ old_time :: !UTCTime
, old_inst :: !Double
, old_avg :: !Double
}
calculateProcessorLoad :: Atomic -> IORef EMAState -> IO Int64
calculateProcessorLoad !var !ref = do
ES{..} <- readIORef ref
time <- getCurrentTime
sample <- Atomic.and var 0
let
active_ns = fromIntegral sample
elapsed_s = realToFrac (diffUTCTime time old_time)
elapsed_ns = 1.0E9 * elapsed_s
load_inst = 100 * (active_ns / elapsed_ns)
load_avg = ema 0.2 elapsed_s old_avg old_inst load_inst
writeIORef ref (ES time load_inst load_avg)
return (round load_avg)
ema :: Double -> Double -> Double -> Double -> Double -> Double
ema !alpha !dt !old_ema !old_sample !new_sample =
let
a = dt / alpha
u = exp ( a )
v = ( 1 u ) / a
in
(u * old_ema) + ((vu) * old_sample) + ((1v) * new_sample)
_active_ns_llvm_native :: Atomic
_active_ns_llvm_native = unsafePerformIO (Atomic.new 0)
_active_ns_llvm_ptx :: Atomic
_active_ns_llvm_ptx = unsafePerformIO (Atomic.new 0)
_active_ns_cuda :: Atomic
_active_ns_cuda = unsafePerformIO (Atomic.new 0)
_bytesAllocated :: Counter
_bytesAllocated = unsafePerformIO Counter.new
_bytesCopiedToRemote :: Counter
_bytesCopiedToRemote = unsafePerformIO Counter.new
_bytesCopiedFromRemote :: Counter
_bytesCopiedFromRemote = unsafePerformIO Counter.new
_bytesActive :: Gauge
_bytesActive = unsafePerformIO Gauge.new
_bytesNursery :: Gauge
_bytesNursery = unsafePerformIO Gauge.new
_numMajorGC :: Counter
_numMajorGC = unsafePerformIO Counter.new
_numEvictions :: Counter
_numEvictions = unsafePerformIO Counter.new
#endif