{-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} -- | This module lets you periodically flush metrics to elasticsearch. Example -- usage: -- -- > main = do -- > store <- newStore -- > forkElasticSearch defaultESOptions store -- -- You probably want to include some of the predefined metrics defined -- in the ekg-core package, by calling e.g. the 'registerGcStats' -- function defined in that package. module System.Remote.Monitoring.ElasticSearch ( -- * The elasticsearch syncer ElasticSearch , elasticSearchThreadId , forkElasticSearch -- * ElasticSearch options , ESOptions(..) , defaultESOptions ) where import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Exception (catch) import Control.Lens import Control.Monad (forever, void) import Data.Default.Class (def) import qualified Data.HashMap.Strict as M import Data.Int (Int64) import Data.Monoid ((<>)) import Data.Text (Text) import qualified Data.Text as T import Data.Text.Lens import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Time.Format (defaultTimeLocale, formatTime) import Network.HostName (getHostName) import Network.HTTP.Req (HttpException, POST (..), ReqBodyLbs (..), Scheme (Http), runReq) import qualified Network.HTTP.Req as Req import qualified System.Metrics as Metrics import System.Metrics.Json -------------------------------------------------------------------------------- -- | A handle that can be used to control the elasticsearch sync thread. -- Created by 'forkElasticSearch'. newtype ElasticSearch = ElasticSearch { threadId :: ThreadId } -- | The thread ID of the elasticsearch sync thread. You can stop the sync by -- killing this thread (i.e. by throwing it an asynchronous -- exception.) elasticSearchThreadId :: ElasticSearch -> ThreadId elasticSearchThreadId = threadId -------------------------------------------------------------------------------- -- | Options to control how to connect to the elasticsearch server and how -- often to flush metrics. The flush interval should be shorter than -- the flush interval elasticsearch itself uses to flush data to its -- backends. data ESOptions = ESOptions { -- | Server hostname or IP address _host :: !Text -- | Server port , _port :: !Int -- | Error handler , _onError :: !(HttpException -> IO ()) -- | The elasticsearch index to insert into , _indexBase :: !Text -- | Append "-YYYY.MM.DD" onto index? , _indexByDate :: !Bool -- | What to put in the @beat.name@ field , _beatName :: !Text -- | Data push interval, in ms. , _flushInterval :: !Int -- | Print debug output to stderr. , _debug :: !Bool -- | Prefix to add to all metric names. , _prefix :: !Text -- | Suffix to add to all metric names. This is particularly -- useful for sending per host stats by settings this value to: -- @takeWhile (/= \'.\') \<$\> getHostName@, using @getHostName@ -- from the @Network.BSD@ module in the network package. , _suffix :: !Text -- | Extra tags to add to events , _tags :: ![Text] } makeClassy ''ESOptions -- | Defaults: -- -- * @host@ = @\"127.0.0.1\"@ -- -- * @port@ = @8125@ -- -- * @onException@ = @print@ -- -- * @indexBase@ = @metricbeats@ -- -- * @indexByDate@ = @True@ -- -- * @beatName@ = @\"ekg\"@ -- -- * @flushInterval@ = @1000@ -- -- * @debug@ = @False@ defaultESOptions :: ESOptions defaultESOptions = ESOptions { _host = "127.0.0.1" , _port = 9200 , _onError = print , _indexBase = "metricbeat" , _indexByDate = True , _beatName = "ekg" , _flushInterval = 1000 , _debug = False , _prefix = "" , _suffix = "" , _tags = [] } -------------------------------------------------------------------------------- -- | Create a thread that flushes the metrics in the store to elasticsearch. forkElasticSearch :: ESOptions -- ^ Options -> Metrics.Store -- ^ Metric store -> IO ElasticSearch -- ^ ElasticSearch sync handle forkElasticSearch opts store = ElasticSearch <$> forkIO (loop store opts) loop :: Metrics.Store -- ^ Metric store -> ESOptions -- ^ Options -> IO () loop store opts = forever $ do start <- time flushSample store opts end <- time threadDelay ((opts ^. flushInterval) * 1000 - fromIntegral (end - start)) loop store opts -- | Microseconds since epoch. time :: IO Int64 time = (round . (* 1000000.0) . toDouble) `fmap` getPOSIXTime where toDouble = realToFrac :: Real a => a -> Double -------------------------------------------------------------------------------- -- | Construct the correct URL to send metrics too from '_host' and '_port' elasticURL :: ESOptions -> Req.Url 'Http elasticURL eo = Req.http . T.pack $ (eo ^. host.unpacked) <> ":" ++ show (eo ^. port) <> "/_bulk" -- | Construct the index to send to -- -- if '_indexByDate' is @True@ this will be '_indexBase'-YYYY.MM.DD otherwise it -- will just be '_indexBase' mkIndex :: ESOptions -> IO CreateBulk mkIndex eo = CreateBulk <$> if eo ^. indexByDate then appendDate (eo ^. indexBase) else return (eo ^. indexBase) where appendDate base = do day <- formatTime defaultTimeLocale "%Y.%m.%d" <$> getCurrentTime return $ base <> "-" <> (day ^. packed) -------------------------------------------------------------------------------- -- | Generate a 'BeatEvent' for each metric in the 'Metrics.Store' sampleBeatEvents :: Metrics.Store -> ESOptions -> IO [BeatEvent] sampleBeatEvents store eo = do now <- getPOSIXTime sample <- Metrics.sampleAll store hostName <- T.pack <$> getHostName finish <- getPOSIXTime let took = floor $ (finish - now) * 1000 theBeat = Beat hostName (eo ^. beatName) "0.1" mkBeatEvt evts k v = BeatEvent theBeat now(eo ^. tags) took (M.singleton k v) : evts return $ M.foldlWithKey' mkBeatEvt [] sample -------------------------------------------------------------------------------- -- | Create a 'BulkRequest' and send it elasticsearch flushSample :: Metrics.Store -> ESOptions -> IO () flushSample store eo = do createBulk <- mkIndex eo bulkEvts <- sampleBeatEvents store eo let body = ReqBodyLbs . bulkRequestBody . BulkRequest $ (createBulk, ) <$> bulkEvts (void . runReq def $ Req.req POST (elasticURL eo) body Req.ignoreResponse mempty) `catch` _onError eo