module System.Remote.Monitoring.Influxdb
( InfluxdbOptions(..)
, defaultInfluxdbOptions
, forkInfluxdb
, forkInfluxdbRestart
) where
import Data.Monoid ((<>))
import qualified Data.Time.Clock.POSIX as Time
import qualified Data.Text as T
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as Stats
import System.Clock
import qualified Data.Map as M
import qualified Data.Vector as V
import qualified Database.InfluxDB.Writer as Influxdb
import Control.Exception (SomeException, try, bracket)
import Control.Concurrent (ThreadId, forkIO, myThreadId, threadDelay, throwTo)
import Control.Monad (forever)
import qualified Data.HashMap.Strict as HashMap
import Data.Int (Int64)
data InfluxdbOptions = InfluxdbOptions
{
host :: !T.Text
, port :: !Int
, database :: !T.Text
, flushInterval :: !Int
, prefix :: !T.Text
, suffix :: !T.Text
} deriving (Eq, Show)
defaultInfluxdbOptions :: InfluxdbOptions
defaultInfluxdbOptions = InfluxdbOptions
{ host = "127.0.0.1"
, port = 8086
, database = ""
, flushInterval = 1000
, prefix = ""
, suffix = ""
}
forkInfluxdb :: InfluxdbOptions -> EKG.Store -> IO ThreadId
forkInfluxdb opts store =
do parent <- myThreadId
forkInfluxdbRestart opts
store
(\e _ -> throwTo parent e)
forkInfluxdbRestart :: InfluxdbOptions
-> EKG.Store
-> (SomeException -> IO () -> IO ())
-> IO ThreadId
forkInfluxdbRestart opts store exceptionHandler = do
eHandle <- Influxdb.createHandle (Influxdb.Config $ T.unpack ("http://" <> host opts <> ":" <> T.pack (show (port opts)) <> "/" <> database opts))
handle <- case eHandle of
Right h -> return $ h
_ -> unsupportedAddressError
let go = do
terminated <-
try $ bracket
(return handle)
(\_ -> return ())
(loop store opts)
case terminated of
Left exception -> exceptionHandler exception go
Right _ -> go
forkIO go
where unsupportedAddressError =
ioError (userError ("unsupported address: " ++ T.unpack (host opts)))
loop :: EKG.Store -> InfluxdbOptions -> Influxdb.Handle -> IO ()
loop store opts handle = forever $ do
start <- time
sample <- EKG.sampleAll store
flushSample sample handle opts
end <- time
threadDelay (flushInterval opts * 1000 fromIntegral (end start))
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` Time.getPOSIXTime
where toDouble = realToFrac :: Real a => a -> Double
flushSample :: EKG.Sample -> Influxdb.Handle -> InfluxdbOptions -> IO ()
flushSample sample handle opts = do
t <- getTime Realtime
sendMetrics handle
(V.map renamed
(HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms)
V.empty
sample))
where
renamed (Metric n v t) =
let p = if T.null (prefix opts) then "" else prefix opts <> "."
s = if T.null (suffix opts) then "" else "." <> suffix opts
in Metric (p <> n <> s) v t
metrics n (EKG.Counter i) t = V.singleton (Metric ("counter." <> n) (fromIntegral i) t)
metrics n (EKG.Gauge i) t = V.singleton (Metric ("gauge." <> n) (fromIntegral i) t)
metrics _ (EKG.Label {}) _ = V.empty
metrics n (EKG.Distribution stats) t =
let f n' v = Metric ("dist." <> n <> "." <> n') v t
in V.fromList [ f "mean" (Stats.mean stats)
, f "variance" (Stats.variance stats)
, f "count" (fromIntegral $ Stats.count stats)
, f "sum" (Stats.sum stats)
, f "min" (Stats.min stats)
, f "max" (Stats.max stats)
]
data Metric = Metric
{ path :: !T.Text
, value :: !Double
, timestamp :: !TimeSpec
} deriving (Show)
sendMetrics :: Influxdb.Handle -> V.Vector Metric -> IO ()
sendMetrics handle metrics = V.forM_ metrics $ \metric -> do
let tags = M.empty
values = M.singleton "value" (Influxdb.F (value metric))
ts = fromIntegral . toNanoSecs $ (timestamp metric)
in Influxdb.writePoint' handle (path metric) tags values ts