{-# LANGUAGE OverloadedStrings #-} -- | This module lets you periodically flush metrics to a Graphite Carbon -- backend. Example usage: -- -- > main = do -- > store <- newStore -- > forkCarbon defaultCarbonOptions store -- -- You probably want to include some of the predefined metrics defined -- in the @ekg-core@ package, by calling e.g. the 'EKG.registerGcMetrics' -- function defined in that package. module System.Remote.Monitoring.Carbon ( CarbonOptions(..) , defaultCarbonOptions , forkCarbon , forkCarbonRestart ) where import Control.Exception (SomeException, try, bracket) import Control.Concurrent (ThreadId, forkIO, myThreadId, threadDelay, throwTo) import Control.Monad (forever) import Data.Int (Int64) import Data.Monoid ((<>)) import qualified Data.HashMap.Strict as HashMap import qualified Data.Text as T import qualified Data.Time as Time import qualified Data.Time.Clock.POSIX as Time import qualified Data.Vector as V import qualified Network.Carbon.Plaintext as Carbon import qualified Network.Socket as Network import qualified System.Metrics as EKG import qualified System.Metrics.Distribution as Stats -------------------------------------------------------------------------------- -- | Options to control how to connect to the Carbon server and how often to -- flush metrics. The flush interval should match the shortest retention rate -- of the matching retention periods, or you risk over-riding previous -- samples. data CarbonOptions = CarbonOptions { -- | The hostname or IP address of the server running Carbon. host :: !T.Text -- | Server port of the TCP line receiver interface. , port :: !Int -- | The amount of time between sampling EKG metrics and pushing to Carbon. , flushInterval :: !Int -- | Prefix to add to all metric names. , prefix :: !T.Text -- | Suffix to add to all metric names. This is particularly -- useful for sending per host stats by settings this value to: -- @takeWhile (/= \'.\') \<$\> getHostName@, using @getHostName@ -- from the @Network.BSD@ module in the network package. , suffix :: !T.Text } deriving (Eq, Show) -------------------------------------------------------------------------------- -- | Defaults: -- -- * @host@ = @\"127.0.0.1\"@ -- -- * @port@ = @2003@ -- -- * @flushInterval@ = @1000@ -- -- * Empty 'prefix' and 'suffix'. defaultCarbonOptions :: CarbonOptions defaultCarbonOptions = CarbonOptions { host = "127.0.0.1" , port = 2003 , flushInterval = 1000 , prefix = "" , suffix = "" } -------------------------------------------------------------------------------- -- | Create a thread that periodically flushes the metrics in 'EKG.Store' to -- Carbon. If the thread flushing statistics throws an exception (for example, the -- network connection is lost), this exception will be thrown up to the thread -- that called 'forkCarbon'. For more control, see 'forkCarbonRestart'. forkCarbon :: CarbonOptions -> EKG.Store -> IO ThreadId forkCarbon opts store = do parent <- myThreadId forkCarbonRestart opts store (\e _ -> throwTo parent e) -------------------------------------------------------------------------------- -- | Create a thread that periodically flushes the metrics in 'EKG.Store' to -- Carbon. If the thread flushing statistics throws an exception (for example, the -- network connection is lost), the callback function will be invoked with the -- exception that was thrown, and an 'IO' computation to restart the handler. -- -- For example, you can use 'forkCarbonRestart' to log failures and restart -- logging: -- -- > forkCarbonRestart defaultCarbonOptions -- > store -- > (\ex restart -> do hPutStrLn stderr ("ekg-carbon: " ++ show ex) -- > restart) forkCarbonRestart :: CarbonOptions -> EKG.Store -> (SomeException -> IO () -> IO ()) -> IO ThreadId forkCarbonRestart opts store exceptionHandler = do addrInfos <- Network.getAddrInfo Nothing (Just (T.unpack (host opts))) (Just (show (port opts))) addrInfo <- case addrInfos of (addrInfo:_) -> return $ Network.addrAddress addrInfo _ -> unsupportedAddressError let go = do terminated <- try $ bracket (Carbon.connect addrInfo) Carbon.disconnect (loop store opts) case terminated of Left exception -> exceptionHandler exception go Right _ -> go forkIO go where unsupportedAddressError = ioError (userError ("unsupported address: " ++ T.unpack (host opts))) -------------------------------------------------------------------------------- loop :: EKG.Store -> CarbonOptions -> Carbon.Connection -> IO () loop store opts socket = forever $ do start <- time sample <- EKG.sampleAll store flushSample sample socket opts end <- time threadDelay (flushInterval opts * 1000 - fromIntegral (end - start)) -- | Microseconds since epoch. time :: IO Int64 time = (round . (* 1000000.0) . toDouble) `fmap` Time.getPOSIXTime where toDouble = realToFrac :: Real a => a -> Double flushSample :: EKG.Sample -> Carbon.Connection -> CarbonOptions -> IO () flushSample sample conn opts = do t <- Time.getCurrentTime Carbon.sendMetrics conn (V.map renamed (HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms) V.empty sample)) where renamed (Carbon.Metric n v t) = let p = if T.null (prefix opts) then "" else prefix opts <> "." s = if T.null (suffix opts) then "" else "." <> suffix opts in Carbon.Metric (p <> n <> s) v t metrics n (EKG.Counter i) t = V.singleton (Carbon.Metric n (fromIntegral i) t) metrics n (EKG.Gauge i) t = V.singleton (Carbon.Metric n (fromIntegral i) t) metrics _ (EKG.Label {}) _ = V.empty metrics n (EKG.Distribution stats) t = let f n' v = Carbon.Metric (n <> "." <> n') v t in V.fromList [ f "mean" (Stats.mean stats) , f "variance" (Stats.variance stats) , f "count" (fromIntegral $ Stats.count stats) , f "sum" (Stats.sum stats) , f "min" (Stats.min stats) , f "max" (Stats.max stats) ]