{-# LANGUAGE BangPatterns             #-}
{-# LANGUAGE CPP                      #-}
{-# LANGUAGE ForeignFunctionInterface #-}
{-# LANGUAGE OverloadedStrings        #-}
{-# LANGUAGE RecordWildCards          #-}
{-# LANGUAGE TemplateHaskell          #-}
{-# OPTIONS_GHC -fobject-code #-}
{-# OPTIONS_HADDOCK hide #-}
module Data.Array.Accelerate.Debug.Monitoring (
  beginMonitoring,
  initAccMetrics,
  
  Processor(..),
  withProcessor, addProcessorTime,
  
  didAllocateBytesLocal, didAllocateBytesRemote,
  didCopyBytesToRemote, didCopyBytesFromRemote,
  increaseCurrentBytesRemote, decreaseCurrentBytesRemote,
  increaseCurrentBytesNursery, decreaseCurrentBytesNursery, getCurrentBytesNursery, setCurrentBytesNursery,
  didRemoteGC,
  didEvictBytes,
) where
#ifdef ACCELERATE_MONITORING
import Data.Array.Accelerate.Debug.Clock
import System.Metrics
import System.Remote.Monitoring
import Control.Concurrent
import Control.Concurrent.Async
import Data.IORef
import Data.Text                                                    ( Text )
import Text.Printf
import qualified Data.HashMap.Strict                                as Map
#endif
#if defined(ACCELERATE_MONITORING) || defined(ACCELERATE_DEBUG)
import Control.Monad
#endif
import Data.Atomic                                                  ( Atomic )
import qualified Data.Atomic                                        as Atomic
import Data.Int
import Language.Haskell.TH.Syntax
beginMonitoring :: IO ()
#ifdef ACCELERATE_MONITORING
beginMonitoring = do
  store <- initAccMetrics
  registerGcMetrics store
  r     <- withAsync (forkServerWith store "localhost" 8000 >> threadDelay 10000) waitCatch
  case r of
    Right _ -> printf "EKG monitor started at: http://localhost:8000\n"
    Left _  -> printf "Failed to start EKG monitor\n"
#else
beginMonitoring :: IO ()
beginMonitoring = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#endif
#ifndef ACCELERATE_MONITORING
initAccMetrics :: IO a
initAccMetrics :: IO a
initAccMetrics = [Char] -> IO a
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO a) -> [Char] -> IO a
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
unlines [ [Char]
"Data.Array.Accelerate: Monitoring is disabled."
                                 , [Char]
"Reinstall package 'accelerate' with '-fekg' to enable it." ]
#else
initAccMetrics :: IO Store
initAccMetrics = do
  store <- newStore
  registerRate    "acc.load.llvm_native"             (estimateProcessorLoad __active_ns_llvm_native) store
  registerRate    "acc.load.llvm_ptx"                (estimateProcessorLoad __active_ns_llvm_ptx)    store
  registerGauge   "acc.gc.current_bytes_remote"      (Atomic.read __current_bytes_remote)            store
  registerGauge   "acc.gc.current_bytes_nursery"     (Atomic.read __current_bytes_nursery)           store
  registerCounter "acc.gc.bytes_allocated_local"     (Atomic.read __total_bytes_allocated_local)     store
  registerCounter "acc.gc.bytes_allocated_remote"    (Atomic.read __total_bytes_allocated_remote)    store
  registerCounter "acc.gc.bytes_copied_to_remote"    (Atomic.read __total_bytes_copied_to_remote)    store
  registerCounter "acc.gc.bytes_copied_from_remote"  (Atomic.read __total_bytes_copied_from_remote)  store
  registerCounter "acc.gc.bytes_evicted_from_remote" (Atomic.read __total_bytes_evicted_from_remote) store
  registerCounter "acc.gc.num_gcs"                   (Atomic.read __num_remote_gcs)                  store
  registerCounter "acc.gc.num_lru_evict"             (Atomic.read __num_evictions)                   store
  return store
registerRate :: Text -> (IORef EMAState -> IO Int64) -> Store -> IO ()
registerRate name sample store = do
  now <- getMonotonicTime
  st  <- newIORef (ES now 0 0)
  registerGroup (Map.singleton name Gauge) (sample st) store
#endif
data Processor = Native | PTX
{-# INLINE withProcessor #-}
withProcessor :: Processor -> IO a -> IO a
#ifndef ACCELERATE_MONITORING
withProcessor :: Processor -> IO a -> IO a
withProcessor Processor
_      = IO a -> IO a
forall a. a -> a
id
#else
withProcessor Native = withProcessor' __active_ns_llvm_native
withProcessor PTX    = withProcessor' __active_ns_llvm_ptx
withProcessor' :: Atomic -> IO a -> IO a
withProcessor' var action = do
  wall0 <- getMonotonicTime
  !r    <- action
  wall1 <- getMonotonicTime
  addProcessorTime' var (wall1 - wall0)
  return r
#endif
{-# INLINE addProcessorTime #-}
addProcessorTime :: Processor -> Double -> IO ()
#ifndef ACCELERATE_MONITORING
addProcessorTime :: Processor -> Double -> IO ()
addProcessorTime Processor
_ Double
_    = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
addProcessorTime Native = addProcessorTime' __active_ns_llvm_native
addProcessorTime PTX    = addProcessorTime' __active_ns_llvm_ptx
addProcessorTime' :: Atomic -> Double -> IO ()
addProcessorTime' var secs =
  let ns   = round (secs * 1.0E9)
  in  void $ Atomic.add var ns
#endif
{-# INLINE didAllocateBytesLocal #-}
didAllocateBytesLocal :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didAllocateBytesLocal :: Int64 -> IO ()
didAllocateBytesLocal Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didAllocateBytesLocal n = do
  
  void $ Atomic.add __total_bytes_allocated_local n
#endif
{-# INLINE didAllocateBytesRemote     #-}
{-# INLINE increaseCurrentBytesRemote #-}
{-# INLINE decreaseCurrentBytesRemote #-}
didAllocateBytesRemote     :: Int64 -> IO ()
decreaseCurrentBytesRemote :: Int64 -> IO ()
increaseCurrentBytesRemote :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didAllocateBytesRemote :: Int64 -> IO ()
didAllocateBytesRemote     Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
increaseCurrentBytesRemote :: Int64 -> IO ()
increaseCurrentBytesRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
decreaseCurrentBytesRemote :: Int64 -> IO ()
decreaseCurrentBytesRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didAllocateBytesRemote n = do
 
 void $ Atomic.add __total_bytes_allocated_remote n
increaseCurrentBytesRemote n = void $ Atomic.add __current_bytes_remote n
decreaseCurrentBytesRemote n = void $ Atomic.subtract __current_bytes_remote n
#endif
{-# INLINE didCopyBytesToRemote   #-}
{-# INLINE didCopyBytesFromRemote #-}
didCopyBytesFromRemote :: Int64 -> IO ()
didCopyBytesToRemote   :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didCopyBytesToRemote :: Int64 -> IO ()
didCopyBytesToRemote   Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
didCopyBytesFromRemote :: Int64 -> IO ()
didCopyBytesFromRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didCopyBytesToRemote   n = void $ Atomic.add __total_bytes_copied_to_remote n
didCopyBytesFromRemote n = void $ Atomic.add __total_bytes_copied_from_remote n
#endif
{-# INLINE increaseCurrentBytesNursery #-}
{-# INLINE decreaseCurrentBytesNursery #-}
{-# INLINE setCurrentBytesNursery      #-}
increaseCurrentBytesNursery :: Int64 -> IO ()
decreaseCurrentBytesNursery :: Int64 -> IO ()
setCurrentBytesNursery      :: Int64 -> IO ()
getCurrentBytesNursery      :: IO Int64
#ifndef ACCELERATE_DEBUG
increaseCurrentBytesNursery :: Int64 -> IO ()
increaseCurrentBytesNursery Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
decreaseCurrentBytesNursery :: Int64 -> IO ()
decreaseCurrentBytesNursery Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
setCurrentBytesNursery :: Int64 -> IO ()
setCurrentBytesNursery      Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
getCurrentBytesNursery :: IO Int64
getCurrentBytesNursery        = Int64 -> IO Int64
forall (m :: * -> *) a. Monad m => a -> m a
return Int64
0
#else
increaseCurrentBytesNursery n = void $ Atomic.add      __current_bytes_nursery n
decreaseCurrentBytesNursery n = void $ Atomic.subtract __current_bytes_nursery n
setCurrentBytesNursery      n =        Atomic.write    __current_bytes_nursery n
getCurrentBytesNursery        =        Atomic.read     __current_bytes_nursery
#endif
{-# INLINE didRemoteGC #-}
didRemoteGC :: IO ()
#ifndef ACCELERATE_DEBUG
didRemoteGC :: IO ()
didRemoteGC = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didRemoteGC = void $ Atomic.add __num_remote_gcs 1
#endif
{-# INLINE didEvictBytes #-}
didEvictBytes :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didEvictBytes :: Int64 -> IO ()
didEvictBytes Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didEvictBytes n = do
  void $ Atomic.add __num_evictions 1
  void $ Atomic.add __total_bytes_evicted_from_remote n
#endif
#ifdef ACCELERATE_MONITORING
data EMAState = ES
  { old_time  :: {-# UNPACK #-} !Double
  , old_inst  :: {-# UNPACK #-} !Double
  , old_avg   :: {-# UNPACK #-} !Double
  }
estimateProcessorLoad :: Atomic -> IORef EMAState -> IO Int64
estimateProcessorLoad !var !ref = do
  ES{..} <- readIORef ref
  time   <- getMonotonicTime
  sample <- Atomic.and var 0
  
  let
      active_ns   = fromIntegral sample
      elapsed_s   = old_time - time
      elapsed_ns  = 1.0E9 * elapsed_s
      
      new_inst    = 100 * (active_ns / elapsed_ns)                
      new_avg     = ema 0.2 elapsed_s old_avg old_inst new_inst   
  
  writeIORef ref (ES time new_inst new_avg)
  return (round new_avg)
ema :: Double -> Double -> Double -> Double -> Double -> Double
ema !alpha !dt !old_ema !old_sample !new_sample =
  let
      a = dt / alpha
      u = exp ( -a )
      v = ( 1 - u ) / a
  in
  (u * old_ema) + ((v-u) * old_sample) + ((1-v) * new_sample)
#endif
foreign import ccall "&__active_ns_llvm_native"           __active_ns_llvm_native           :: Atomic
foreign import ccall "&__active_ns_llvm_ptx"              __active_ns_llvm_ptx              :: Atomic
foreign import ccall "&__current_bytes_remote"            __current_bytes_remote            :: Atomic 
foreign import ccall "&__current_bytes_nursery"           __current_bytes_nursery           :: Atomic 
foreign import ccall "&__total_bytes_allocated_local"     __total_bytes_allocated_local     :: Atomic 
foreign import ccall "&__total_bytes_allocated_remote"    __total_bytes_allocated_remote    :: Atomic 
foreign import ccall "&__total_bytes_copied_to_remote"    __total_bytes_copied_to_remote    :: Atomic 
foreign import ccall "&__total_bytes_copied_from_remote"  __total_bytes_copied_from_remote  :: Atomic 
foreign import ccall "&__total_bytes_evicted_from_remote" __total_bytes_evicted_from_remote :: Atomic 
foreign import ccall "&__num_remote_gcs"                  __num_remote_gcs                  :: Atomic 
foreign import ccall "&__num_evictions"                   __num_evictions                   :: Atomic 
runQ $ do
  addForeignFilePath LangC "cbits/monitoring.c"
  return []