module System.Remote.Monitoring.Carbon
( CarbonOptions(..)
, defaultCarbonOptions
, forkCarbon
) where
import Control.Concurrent (ThreadId, forkFinally, myThreadId, threadDelay, throwTo)
import Control.Monad (forever)
import Data.Int (Int64)
import Data.Monoid ((<>))
import qualified Data.HashMap.Strict as HashMap
import qualified Data.Text as T
import qualified Data.Time as Time
import qualified Data.Time.Clock.POSIX as Time
import qualified Data.Vector as V
import qualified Network.Carbon.Plaintext as Carbon
import qualified Network.Socket as Network
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as Stats
data CarbonOptions = CarbonOptions
{
host :: !T.Text
, port :: !Int
, flushInterval :: !Int
, prefix :: !T.Text
, suffix :: !T.Text
} deriving (Eq, Show)
defaultCarbonOptions :: CarbonOptions
defaultCarbonOptions = CarbonOptions
{ host = "127.0.0.1"
, port = 2003
, flushInterval = 1000
, prefix = ""
, suffix = ""
}
forkCarbon :: CarbonOptions -> EKG.Store -> IO (ThreadId)
forkCarbon opts store = do
addrInfos <- Network.getAddrInfo Nothing
(Just $ T.unpack $ host opts)
(Just $ show $ port opts)
c <- case addrInfos of
(addrInfo : _) -> Carbon.connect (Network.addrAddress addrInfo)
_ -> unsupportedAddressError
parent <- myThreadId
forkFinally (loop store c opts)
(\r -> do Carbon.disconnect c
case r of
Left e -> throwTo parent e
Right _ -> return ())
where
unsupportedAddressError = ioError $ userError $
"unsupported address: " ++ T.unpack (host opts)
loop :: EKG.Store -> Carbon.Connection -> CarbonOptions -> IO ()
loop store socket opts = forever $ do
start <- time
sample <- EKG.sampleAll store
flushSample sample socket opts
end <- time
threadDelay (flushInterval opts * 1000 fromIntegral (end start))
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` Time.getPOSIXTime
where toDouble = realToFrac :: Real a => a -> Double
flushSample :: EKG.Sample -> Carbon.Connection -> CarbonOptions -> IO ()
flushSample sample conn opts = do
t <- Time.getCurrentTime
Carbon.sendMetrics conn
(V.map renamed
(HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms)
V.empty
sample))
where
renamed (Carbon.Metric n v t) =
let p = if T.null (prefix opts) then "" else prefix opts <> "."
s = if T.null (suffix opts) then "" else "." <> suffix opts
in Carbon.Metric (p <> n <> s) v t
metrics n (EKG.Counter i) t = V.singleton (Carbon.Metric n (fromIntegral i) t)
metrics n (EKG.Gauge i) t = V.singleton (Carbon.Metric n (fromIntegral i) t)
metrics _ (EKG.Label {}) _ = V.empty
metrics n (EKG.Distribution stats) t =
let f n' v = Carbon.Metric (n <> "." <> n') v t
in V.fromList [ f "mean" (Stats.mean stats)
, f "variance" (Stats.variance stats)
, f "count" (fromIntegral $ Stats.count stats)
, f "sum" (Stats.sum stats)
, f "min" (Stats.min stats)
, f "max" (Stats.max stats)
]