{-# LANGUAGE OverloadedStrings #-}
module System.Remote.Monitoring.Carbon
( CarbonOptions(..)
, defaultCarbonOptions
, forkCarbon
, forkCarbonRestart
) where
import Control.Exception (SomeException, try, bracket)
import Control.Concurrent (ThreadId, forkIO, myThreadId, threadDelay, throwTo)
import Control.Monad (forever)
import Data.Int (Int64)
import Data.Monoid ((<>))
import qualified Data.HashMap.Strict as HashMap
import qualified Data.Text as T
import qualified Data.Time as Time
import qualified Data.Time.Clock.POSIX as Time
import qualified Data.Vector as V
import qualified Network.Carbon.Plaintext as Carbon
import qualified Network.Socket as Network
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as Stats
data CarbonOptions = CarbonOptions
{
host :: !T.Text
, port :: !Int
, flushInterval :: !Int
, prefix :: !T.Text
, suffix :: !T.Text
} deriving (Eq, Show)
defaultCarbonOptions :: CarbonOptions
defaultCarbonOptions = CarbonOptions
{ host = "127.0.0.1"
, port = 2003
, flushInterval = 1000
, prefix = ""
, suffix = ""
}
forkCarbon :: CarbonOptions -> EKG.Store -> IO ThreadId
forkCarbon opts store =
do parent <- myThreadId
forkCarbonRestart opts
store
(\e _ -> throwTo parent e)
forkCarbonRestart :: CarbonOptions
-> EKG.Store
-> (SomeException -> IO () -> IO ())
-> IO ThreadId
forkCarbonRestart opts store exceptionHandler =
do addrInfos <-
Network.getAddrInfo Nothing
(Just (T.unpack (host opts)))
(Just (show (port opts)))
addrInfo <-
case addrInfos of
(addrInfo:_) -> return $ Network.addrAddress addrInfo
_ -> unsupportedAddressError
let go =
do terminated <-
try $ bracket
(Carbon.connect addrInfo)
Carbon.disconnect
(loop store opts)
case terminated of
Left exception ->
exceptionHandler exception go
Right _ -> go
forkIO go
where unsupportedAddressError =
ioError (userError ("unsupported address: " ++ T.unpack (host opts)))
loop :: EKG.Store -> CarbonOptions -> Carbon.Connection -> IO ()
loop store opts socket = forever $ do
start <- time
sample <- EKG.sampleAll store
flushSample sample socket opts
end <- time
threadDelay (flushInterval opts * 1000 - fromIntegral (end - start))
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` Time.getPOSIXTime
where toDouble = realToFrac :: Real a => a -> Double
flushSample :: EKG.Sample -> Carbon.Connection -> CarbonOptions -> IO ()
flushSample sample conn opts = do
t <- Time.getCurrentTime
Carbon.sendMetrics conn
(V.map renamed
(HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms)
V.empty
sample))
where
renamed (Carbon.Metric n v t) =
let p = if T.null (prefix opts) then "" else prefix opts <> "."
s = if T.null (suffix opts) then "" else "." <> suffix opts
in Carbon.Metric (p <> n <> s) v t
metrics n (EKG.Counter i) t = V.singleton (Carbon.Metric n (fromIntegral i) t)
metrics n (EKG.Gauge i) t = V.singleton (Carbon.Metric n (fromIntegral i) t)
metrics _ (EKG.Label {}) _ = V.empty
metrics n (EKG.Distribution stats) t =
let f n' v = Carbon.Metric (n <> "." <> n') v t
in V.fromList [ f "mean" (Stats.mean stats)
, f "variance" (Stats.variance stats)
, f "count" (fromIntegral $ Stats.count stats)
, f "sum" (Stats.sum stats)
, f "min" (Stats.min stats)
, f "max" (Stats.max stats)
]