module System.Remote.Monitoring.ElasticSearch
(
ElasticSearch
, elasticSearchThreadId
, forkElasticSearch
, ESOptions(..)
, defaultESOptions
) where
import Control.Concurrent (ThreadId, forkIO, threadDelay)
import Control.Exception (catch)
import Control.Lens
import Control.Monad (forever, void)
import Data.Default.Class (def)
import qualified Data.HashMap.Strict as M
import Data.Int (Int64)
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Lens
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Network.HostName (getHostName)
import Network.HTTP.Req (HttpException, POST (..),
ReqBodyLbs (..), Scheme (Http), runReq)
import qualified Network.HTTP.Req as Req
import qualified System.Metrics as Metrics
import System.Metrics.Json
newtype ElasticSearch = ElasticSearch { threadId :: ThreadId }
elasticSearchThreadId :: ElasticSearch -> ThreadId
elasticSearchThreadId = threadId
data ESOptions = ESOptions
{
_host :: !Text
, _port :: !Int
, _onError :: !(HttpException -> IO ())
, _indexBase :: !Text
, _indexByDate :: !Bool
, _beatName :: !Text
, _flushInterval :: !Int
, _debug :: !Bool
, _prefix :: !Text
, _suffix :: !Text
, _tags :: ![Text]
}
makeClassy ''ESOptions
defaultESOptions :: ESOptions
defaultESOptions = ESOptions
{ _host = "127.0.0.1"
, _port = 9200
, _onError = print
, _indexBase = "metricbeat"
, _indexByDate = True
, _beatName = "ekg"
, _flushInterval = 1000
, _debug = False
, _prefix = ""
, _suffix = ""
, _tags = []
}
forkElasticSearch :: ESOptions
-> Metrics.Store
-> IO ElasticSearch
forkElasticSearch opts store = ElasticSearch <$> forkIO (loop store opts)
loop :: Metrics.Store
-> ESOptions
-> IO ()
loop store opts = forever $ do
start <- time
flushSample store opts
end <- time
threadDelay ((opts ^. flushInterval) * 1000 fromIntegral (end start))
loop store opts
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` getPOSIXTime
where toDouble = realToFrac :: Real a => a -> Double
elasticURL :: ESOptions -> Req.Url 'Http
elasticURL eo = Req.http . T.pack $ (eo ^. host.unpacked) <> ":" ++ show (eo ^. port) <> "/_bulk"
mkIndex :: ESOptions -> IO CreateBulk
mkIndex eo =
CreateBulk <$> if eo ^. indexByDate
then appendDate (eo ^. indexBase)
else return (eo ^. indexBase)
where
appendDate base = do
day <- formatTime defaultTimeLocale "%Y.%m.%d" <$> getCurrentTime
return $ base <> "-" <> (day ^. packed)
sampleBeatEvents :: Metrics.Store -> ESOptions -> IO [BeatEvent]
sampleBeatEvents store eo = do
now <- getPOSIXTime
sample <- Metrics.sampleAll store
hostName <- T.pack <$> getHostName
finish <- getPOSIXTime
let took = floor $ (finish now) * 1000
theBeat = Beat hostName (eo ^. beatName) "0.1"
mkBeatEvt evts k v = BeatEvent theBeat now(eo ^. tags) took (M.singleton k v) : evts
return $ M.foldlWithKey' mkBeatEvt [] sample
flushSample :: Metrics.Store -> ESOptions -> IO ()
flushSample store eo = do
createBulk <- mkIndex eo
bulkEvts <- sampleBeatEvents store eo
let body = ReqBodyLbs . bulkRequestBody . BulkRequest $ (createBulk, ) <$> bulkEvts
(void . runReq def $ Req.req POST
(elasticURL eo)
body
Req.ignoreResponse
mempty)
`catch` _onError eo