module System.Remote.Monitoring
(
Server
, serverThreadId
, forkServer
, getCounter
, getGauge
) where
import Control.Applicative ((<$>), (<|>))
import Control.Concurrent (ThreadId, forkIO)
import Control.Monad (forM, join, unless)
import Control.Monad.IO.Class (liftIO)
import qualified Data.Aeson.Encode as A
import Data.Aeson.Types ((.=))
import qualified Data.Aeson.Types as A
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as S8
import Data.Function (on)
import qualified Data.HashMap.Strict as M
import Data.IORef (IORef, atomicModifyIORef, newIORef, readIORef)
import qualified Data.List as List
import qualified Data.Map as Map
import Data.Maybe (listToMaybe)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Word (Word8)
import qualified GHC.Stats as Stats
import Paths_ekg (getDataDir)
import Prelude hiding (read)
import Snap.Core (MonadSnap, Request, Snap, finishWith, getHeaders, getRequest,
getResponse, method, Method(GET), modifyResponse, pass, route,
rqParams, rqPathInfo, setContentType, setResponseStatus,
writeBS, writeLBS)
import Snap.Http.Server (httpServe)
import qualified Snap.Http.Server.Config as Config
import Snap.Util.FileServe (serveDirectory)
import System.FilePath ((</>))
import System.Remote.Counter (Counter)
import qualified System.Remote.Counter.Internal as Counter
import System.Remote.Gauge (Gauge)
import qualified System.Remote.Gauge.Internal as Gauge
type Counters = M.HashMap T.Text Counter
type Gauges = M.HashMap T.Text Gauge
data Server = Server {
threadId :: !ThreadId
, userCounters :: !(IORef Counters)
, userGauges :: !(IORef Gauges)
}
serverThreadId :: Server -> ThreadId
serverThreadId = threadId
forkServer :: S.ByteString
-> Int
-> IO Server
forkServer host port = do
counters <- newIORef M.empty
gauges <- newIORef M.empty
tid <- forkIO $ httpServe conf (monitor counters gauges)
return $! Server tid counters gauges
where conf = Config.setVerbose False $
Config.setErrorLog Config.ConfigNoLog $
Config.setAccessLog Config.ConfigNoLog $
Config.setPort port $
Config.setHostname host $
Config.defaultConfig
class Ref r where
new :: IO r
read :: r -> IO Int
instance Ref Counter where
new = Counter.new
read = Counter.read
instance Ref Gauge where
new = Gauge.new
read = Gauge.read
getRef :: Ref r
=> T.Text
-> IORef (M.HashMap T.Text r)
-> IO r
getRef name mapRef = do
empty <- new
ref <- atomicModifyIORef mapRef $ \ m ->
case M.lookup name m of
Nothing -> let m' = M.insert name empty m
in (m', empty)
Just ref -> (m, ref)
return ref
getCounter :: T.Text
-> Server
-> IO Counter
getCounter name server = getRef name (userCounters server)
getGauge :: T.Text
-> Server
-> IO Gauge
getGauge name server = getRef name (userGauges server)
data Stats = Stats
!Stats.GCStats
![(T.Text, Json)]
![(T.Text, Json)]
!Double
instance A.ToJSON Stats where
toJSON (Stats gcStats@(Stats.GCStats {..}) counters gauges t) = A.object $
[ "server_timestamp_millis" .= t
, "counters" .= Assocs (gcCounters ++ counters)
, "gauges" .= Assocs (gcGauges ++ gauges)
]
where
(gcCounters, gcGauges) = partitionGcStats gcStats
newtype Combined = Combined Stats
instance A.ToJSON Combined where
toJSON (Combined (Stats (Stats.GCStats {..}) counters gauges t)) =
A.object $
[ "server_timestamp_millis" .= t
, "bytes_allocated" .= bytesAllocated
, "num_gcs" .= numGcs
, "max_bytes_used" .= maxBytesUsed
, "num_bytes_usage_samples" .= numByteUsageSamples
, "cumulative_bytes_used" .= cumulativeBytesUsed
, "bytes_copied" .= bytesCopied
, "current_bytes_used" .= currentBytesUsed
, "current_bytes_slop" .= currentBytesSlop
, "max_bytes_slop" .= maxBytesSlop
, "peak_megabytes_allocated" .= peakMegabytesAllocated
, "mutator_cpu_seconds" .= mutatorCpuSeconds
, "mutator_wall_seconds" .= mutatorWallSeconds
, "gc_cpu_seconds" .= gcCpuSeconds
, "gc_wall_seconds" .= gcWallSeconds
, "cpu_seconds" .= cpuSeconds
, "wall_seconds" .= wallSeconds
, "par_avg_bytes_copied" .= parAvgBytesCopied
, "par_max_bytes_copied" .= parMaxBytesCopied
] ++ map (uncurry (.=)) counters ++
map (uncurry (.=)) gauges
newtype Assocs = Assocs [(T.Text, Json)]
instance A.ToJSON Assocs where
toJSON (Assocs xs) = A.object $ map (uncurry (.=)) xs
data Group = Group
![(T.Text, Json)]
!Double
instance A.ToJSON Group where
toJSON (Group xs t) =
A.object $ ("server_timestamp_millis" .= t) : map (uncurry (.=)) xs
monitor :: IORef Counters -> IORef Gauges -> Snap ()
monitor counters gauges = do
dataDir <- liftIO getDataDir
route [
("", method GET (format "application/json"
(serveAll counters gauges)))
, ("combined", method GET (format "application/json"
(serveCombined counters gauges)))
, ("counters", method GET (format "application/json"
(serveMany counters)))
, ("counters/:name", method GET (format "text/plain"
(serveOne counters)))
, ("gauges", method GET (format "application/json"
(serveMany gauges)))
, ("gauges/:name", method GET (format "text/plain"
(serveOne gauges)))
]
<|> serveDirectory (dataDir </> "assets")
acceptHeader :: Request -> Maybe S.ByteString
acceptHeader req = S.intercalate "," <$> getHeaders "Accept" req
format :: MonadSnap m => S.ByteString -> m a -> m a
format fmt action = do
req <- getRequest
let acceptHdr = (List.head . parseHttpAccept) <$> acceptHeader req
case acceptHdr of
Just hdr | hdr == fmt -> action
_ -> pass
readAllRefs :: Ref r => IORef (M.HashMap T.Text r) -> IO [(T.Text, Json)]
readAllRefs mapRef = do
m <- readIORef mapRef
forM (M.toList m) $ \ (name, ref) -> do
val <- read ref
return (name, Json val)
serveMany :: Ref r => IORef (M.HashMap T.Text r) -> Snap ()
serveMany mapRef = do
list <- liftIO $ readAllRefs mapRef
modifyResponse $ setContentType "application/json"
time <- liftIO getTimeMillis
writeLBS $ A.encode $ A.toJSON $ Group list time
serveAll :: IORef Counters -> IORef Gauges -> Snap ()
serveAll counters gauges = do
req <- getRequest
unless (S.null $ rqPathInfo req) pass
modifyResponse $ setContentType "application/json"
gcStats <- liftIO Stats.getGCStats
counterList <- liftIO $ readAllRefs counters
gaugeList <- liftIO $ readAllRefs gauges
time <- liftIO getTimeMillis
writeLBS $ A.encode $ A.toJSON $ Stats gcStats counterList gaugeList time
serveCombined :: IORef Counters -> IORef Gauges -> Snap ()
serveCombined counters gauges = do
modifyResponse $ setContentType "application/json"
gcStats <- liftIO Stats.getGCStats
counterList <- liftIO $ readAllRefs counters
gaugeList <- liftIO $ readAllRefs gauges
time <- liftIO getTimeMillis
writeLBS $ A.encode $ A.toJSON $ Combined $
Stats gcStats counterList gaugeList time
serveOne :: Ref r => IORef (M.HashMap T.Text r) -> Snap ()
serveOne refs = do
modifyResponse $ setContentType "text/plain"
m <- liftIO $ readIORef refs
req <- getRequest
let mname = T.decodeUtf8 <$> join
(listToMaybe <$> Map.lookup "name" (rqParams req))
case mname of
Nothing -> pass
Just name -> case M.lookup name m of
Just counter -> do
val <- liftIO $ read counter
writeBS $ S8.pack $ show val
Nothing ->
case Map.lookup name builtinCounters of
Just f -> do
gcStats <- liftIO Stats.getGCStats
writeBS $ S8.pack $ f gcStats
Nothing -> do
modifyResponse $ setResponseStatus 404 "Not Found"
r <- getResponse
finishWith r
builtinCounters :: Map.Map T.Text (Stats.GCStats -> String)
builtinCounters = Map.fromList [
("bytes_allocated" , show . Stats.bytesAllocated)
, ("num_gcs" , show . Stats.numGcs)
, ("max_bytes_used" , show . Stats.maxBytesUsed)
, ("num_bytes_usage_samples" , show . Stats.numByteUsageSamples)
, ("cumulative_bytes_used" , show . Stats.cumulativeBytesUsed)
, ("bytes_copied" , show . Stats.bytesCopied)
, ("current_bytes_used" , show . Stats.currentBytesUsed)
, ("current_bytes_slop" , show . Stats.currentBytesSlop)
, ("max_bytes_slop" , show . Stats.maxBytesSlop)
, ("peak_megabytes_allocated" , show . Stats.peakMegabytesAllocated)
, ("mutator_cpu_seconds" , show . Stats.mutatorCpuSeconds)
, ("mutator_wall_seconds" , show . Stats.mutatorWallSeconds)
, ("gc_cpu_seconds" , show . Stats.gcCpuSeconds)
, ("gc_wall_seconds" , show . Stats.gcWallSeconds)
, ("cpu_seconds" , show . Stats.cpuSeconds)
, ("wall_seconds" , show . Stats.wallSeconds)
, ("par_avg_bytes_copied" , show . Stats.parAvgBytesCopied)
, ("par_max_bytes_copied" , show . Stats.parMaxBytesCopied)
]
data Json = forall a. A.ToJSON a => Json a
instance A.ToJSON Json where
toJSON (Json x) = A.toJSON x
partitionGcStats :: Stats.GCStats
-> ([(T.Text, Json)], [(T.Text, Json)])
partitionGcStats (Stats.GCStats {..}) = (counters, gauges)
where
counters = [
("bytes_allocated" , Json bytesAllocated)
, ("num_gcs" , Json numGcs)
, ("num_bytes_usage_samples" , Json numByteUsageSamples)
, ("cumulative_bytes_used" , Json cumulativeBytesUsed)
, ("bytes_copied" , Json bytesCopied)
, ("mutator_cpu_seconds" , Json mutatorCpuSeconds)
, ("mutator_wall_seconds" , Json mutatorWallSeconds)
, ("gc_cpu_seconds" , Json gcCpuSeconds)
, ("gc_wall_seconds" , Json gcWallSeconds)
, ("cpu_seconds" , Json cpuSeconds)
, ("wall_seconds" , Json wallSeconds)
]
gauges = [
("max_bytes_used" , Json maxBytesUsed)
, ("current_bytes_used" , Json currentBytesUsed)
, ("current_bytes_slop" , Json currentBytesSlop)
, ("max_bytes_slop" , Json maxBytesSlop)
, ("peak_megabytes_allocated" , Json peakMegabytesAllocated)
, ("par_avg_bytes_copied" , Json parAvgBytesCopied)
, ("par_max_bytes_copied" , Json parMaxBytesCopied)
]
parseHttpAccept :: S.ByteString -> [S.ByteString]
parseHttpAccept = List.map fst
. List.sortBy (rcompare `on` snd)
. List.map grabQ
. S.split 44
where
rcompare :: Double -> Double -> Ordering
rcompare = flip compare
grabQ s =
let (s', q) = breakDiscard 59 s
(_, q') = breakDiscard 61 q
in (trimWhite s', readQ $ trimWhite q')
readQ s = case reads $ S8.unpack s of
(x, _):_ -> x
_ -> 1.0
trimWhite = S.dropWhile (== 32)
breakDiscard :: Word8 -> S.ByteString -> (S.ByteString, S.ByteString)
breakDiscard w s =
let (x, y) = S.break (== w) s
in (x, S.drop 1 y)
getTimeMillis :: IO Double
getTimeMillis = (realToFrac . (* 1000)) `fmap` getPOSIXTime