{-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} -- | This module lets you periodically flush metrics to a Bosun -- backend. Example usage: -- -- > main = do -- > store <- newStore -- > forkBosun defaultBosunOptions store -- -- You probably want to include some of the predefined metrics defined -- in the @ekg-core@ package, by calling e.g. the 'EKG.registerGcMetrics' -- function defined in that package. module System.Remote.Monitoring.Bosun ( BosunOptions(..) , defaultBosunOptions , forkBosun ) where import Control.Applicative import Control.Concurrent (ThreadId, forkFinally, myThreadId, threadDelay, throwTo) import Control.Exception (try) import Control.Lens hiding ((.=)) import Control.Monad (forever, when) import Data.Aeson ((.=)) import Data.Int (Int64) import Data.Monoid ((<>)) import System.IO.Unsafe (unsafePerformIO) #if MIN_VERSION_time(1,5,0) import Data.Time.Format (defaultTimeLocale) #else import System.Locale (defaultTimeLocale) #endif import qualified Data.Aeson as Aeson import qualified Data.HashMap.Strict as HashMap import qualified Data.Text as T import qualified Data.Time as Time import qualified Data.Time.Clock.POSIX as Time import qualified Data.Vector as V import qualified Network.BSD as Network import qualified Network.HTTP.Client as HTTP import qualified Network.Socket as Network import qualified Network.URI as URI import qualified Network.Wreq as Wreq import qualified System.Metrics as EKG import qualified System.Metrics.Distribution as Stats -------------------------------------------------------------------------------- -- | Options to control how to connect to the Bosun server and how often to -- flush metrics. data BosunOptions = BosunOptions { -- | The route URL to Bosun. bosunRoot :: !URI.URI -- | The amount of time between sampling EKG metrics and pushing to Bosun. , flushInterval :: !Int -- | Tags to apply to all metrics. , tags :: !(HashMap.HashMap T.Text T.Text) } deriving (Eq, Show) -------------------------------------------------------------------------------- firstHostName :: T.Text firstHostName = unsafePerformIO (T.pack <$> Network.getHostName) {-# NOINLINE firstHostName #-} -------------------------------------------------------------------------------- -- | Defaults: -- -- * @bosunRoot@ = @\"http://127.0.0.1:8070/\"@ -- -- * @tags@ = @[("host", hostname)]@ -- -- * @flushInterval@ = @10000@ defaultBosunOptions :: BosunOptions defaultBosunOptions = BosunOptions { bosunRoot = URI.URI { URI.uriScheme = "http:" , URI.uriAuthority = Just (URI.URIAuth { URI.uriUserInfo = "" , URI.uriRegName = "127.0.0.1" , URI.uriPort = ":8070" }) , URI.uriPath = "/" , URI.uriQuery = "" , URI.uriFragment = "" } , tags = HashMap.singleton "host" firstHostName , flushInterval = 10000 } -------------------------------------------------------------------------------- -- | Create a thread that periodically flushes the metrics in 'EKG.Store' to -- Bosun. forkBosun :: BosunOptions -> EKG.Store -> IO ThreadId forkBosun opts store = do parent <- myThreadId forkFinally (do manager <- HTTP.newManager HTTP.defaultManagerSettings let wreqOptions = Wreq.defaults & Wreq.manager .~ Right manager loop store wreqOptions opts) (\r -> do case r of Left e -> throwTo parent e Right _ -> return ()) -------------------------------------------------------------------------------- loop :: EKG.Store -> Wreq.Options -> BosunOptions -> IO () loop store httpOptions opts = forever $ do start <- time sample <- EKG.sampleAll store flushSample sample httpOptions opts end <- time threadDelay (flushInterval opts * 1000 - fromIntegral (end - start)) -- | Microseconds since epoch. time :: IO Int64 time = (round . (* 1000000.0) . toDouble) `fmap` Time.getPOSIXTime where toDouble = realToFrac :: Real a => a -> Double flushSample :: EKG.Sample -> Wreq.Options -> BosunOptions -> IO () flushSample sample httpOptions opts = do t <- Time.getCurrentTime V.mapM postOne (HashMap.foldlWithKey' (\ms k v -> pure (metrics k v t) <> ms) V.empty sample) return () where postOne x = when (not (null x)) $ do res <- try (Wreq.postWith httpOptions (URI.uriToString id ((bosunRoot opts) { URI.uriPath = "/api/put" }) "") (Aeson.Array (V.fromList x))) case res of Left e -> do putStrLn $ "HTTP exception when posting ekg-bosun sample:" print (e :: HTTP.HttpException) Right _ -> return () ametric n v t = [ Aeson.object [ "metric" .= n , "value" .= v , "timestamp" .= (Time.formatTime defaultTimeLocale "%s" t) , "tags" .= Aeson.Object (Aeson.toJSON <$> tags opts) ] | Aeson.toJSON v /= Aeson.Null ] metrics n v t = case v of EKG.Counter i -> ametric n i t EKG.Gauge i -> ametric n i t EKG.Distribution stats -> concat [ ametric (n <> ".count") (Stats.count stats) t , ametric (n <> ".sum") (Stats.sum stats) t , ametric (n <> ".min") (Stats.min stats) t , ametric (n <> ".max") (Stats.max stats) t , ametric (n <> ".mean") (Stats.mean stats) t , ametric (n <> ".variance") (Stats.variance stats) t ] _ -> []