{-# LANGUAGE OverloadedStrings #-}

-- | This module lets you periodically flush metrics to a influxdb
-- backend. Example usage:
--
-- > main = do
-- >   store <- newStore
-- >   forkInfluxdb defaultInfluxdbOptions store
--
-- You probably want to include some of the predefined metrics defined
-- in the @ekg-core@ package, by calling e.g. the 'EKG.registerGcMetrics'
-- function defined in that pacakge.
--
-- NOTE: This package has been modelled after the @ekg-carbon@ package, and
--       is almost identical. The author is indebted to those fine folks who
--       wrote the @ekg-carbon@ package.

module System.Remote.Monitoring.Influxdb
  ( InfluxdbOptions(..)
  , defaultInfluxdbOptions
  , forkInfluxdb
  , forkInfluxdbRestart
  ) where

import Data.Monoid ((<>))
import qualified Data.Time.Clock.POSIX as Time
import qualified Data.Text as T
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as Stats
import           System.Clock
import qualified Data.Map as M
import qualified Data.Vector as V
import qualified Database.InfluxDB.Writer as Influxdb
import Control.Exception (SomeException, try, bracket)
import Control.Concurrent (ThreadId, forkIO, myThreadId, threadDelay, throwTo)
import Control.Monad (forever)
import qualified Data.HashMap.Strict as HashMap
import Data.Int (Int64)

--------------------------------------------------------------------------------
-- | Options to control how to connect to the Influxdb server and how often to
-- flush metrics. The flush interval should match the shortest retention rate of
-- the matching retention periods, or you risk over-riding previous samples.
data InfluxdbOptions = InfluxdbOptions
  { -- | The hostname or IP address of the server running influxdb
    host :: !T.Text
    -- | Server port of the TCP line receiver interface.
  , port :: !Int
    -- | The Influxdb database name
  , database :: !T.Text
    -- | The amount of time between sampling EKG metrics and pushing to Influxdb.
  , flushInterval :: !Int
    -- | Prefix to add to all matric names.
  , prefix :: !T.Text
    -- | Suffix to add to all metric names. This is particularly
    -- useful for sending per host stats by settings this value to:
    -- @takeWhile (/= \'.\') \<$\> getHostName@, using @getHostName@
    -- from the @Network.BSD@ module in the network package.
  , suffix :: !T.Text
  } deriving (Eq, Show)

--------------------------------------------------------------------------------
-- | Defaults
--
-- * @host@ = @\"127.0.0.1\"@
--
-- * @port@ = @8086@
--
-- * @flushInterval@ = @1000@
--
-- * Empty 'database', 'prefix' and 'suffix'.
defaultInfluxdbOptions :: InfluxdbOptions
defaultInfluxdbOptions = InfluxdbOptions
    { host          = "127.0.0.1"
    , port          = 8086
    , database      = ""
    , flushInterval = 1000
    , prefix        = ""
    , suffix        = ""
    }

--------------------------------------------------------------------------------
-- | Create a thread that periodically flushes the metrics in 'EKG.Store' to
-- Influxdb. If the thread flushing statistics throws an exception (for example,
-- the network connection is lost), this exception will be thrown up to the thread
-- that called 'forkInfluxdb'. For more control, see 'forkInfluxdbRestart'.
forkInfluxdb :: InfluxdbOptions -> EKG.Store -> IO ThreadId
forkInfluxdb opts store =
  do parent <- myThreadId
     forkInfluxdbRestart opts
                         store
                         (\e _ -> throwTo parent e)

--------------------------------------------------------------------------------
-- | Create a thread that periodically flushes the metrics in 'EKG.Store' to
-- Influxdb. If the thread flushing statistics throws an exception (for example,
-- the network connection is lost), the callback function will be invoked with the
-- exception that was thrown, and an 'IO' computation to restart the handler.
--
-- For example, you can use 'forkInfluxdbRestart' to log failures and restart
-- logging:
--
-- > forkInfluxdbRestart defaultInfluxdbOptions
-- >                     store
-- >                     (\ex restart -> do hPutStrLn stderr ("ekg-influxdb: " ++ show ex)
-- >                                        restart)
forkInfluxdbRestart :: InfluxdbOptions
                    -> EKG.Store
                    -> (SomeException -> IO () -> IO ())
                    -> IO ThreadId
forkInfluxdbRestart opts store exceptionHandler = do
  eHandle <- Influxdb.createHandle (Influxdb.Config $ T.unpack ("http://" <> host opts <> ":" <> T.pack (show (port opts)) <> "/" <> database opts))
  handle <- case eHandle of
    Right h -> return $ h
    _ -> unsupportedAddressError
  let go = do
        terminated <-
          try $ bracket
          (return handle)
          (\_ -> return ())
          (loop store opts)
        case terminated of
          Left exception -> exceptionHandler exception go
          Right _ -> go
  forkIO go
  where unsupportedAddressError =
          ioError (userError ("unsupported address: " ++ T.unpack (host opts)))


--------------------------------------------------------------------------------
loop :: EKG.Store -> InfluxdbOptions -> Influxdb.Handle -> IO ()
loop store opts handle = forever $ do
  start <- time
  sample <- EKG.sampleAll store
  flushSample sample handle opts
  end <- time
  threadDelay (flushInterval opts * 1000 - fromIntegral (end - start))

-- | Microseconds since epoch.
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` Time.getPOSIXTime
  where toDouble = realToFrac :: Real a => a -> Double

flushSample :: EKG.Sample -> Influxdb.Handle -> InfluxdbOptions -> IO ()
flushSample sample handle opts = do
  t <- getTime Realtime
  sendMetrics handle
    (V.map renamed
      (HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms)
        V.empty
        sample))

  where
  renamed (Metric n v t) =
    let p = if T.null (prefix opts) then "" else prefix opts <> "."
        s = if T.null (suffix opts) then "" else "." <> suffix opts
    in Metric (p <> n <> s) v t

  metrics n (EKG.Counter i) t = V.singleton (Metric ("counter." <> n) (fromIntegral i) t)
  metrics n (EKG.Gauge i) t = V.singleton (Metric ("gauge." <> n) (fromIntegral i) t)
  metrics _ (EKG.Label {}) _ = V.empty
  metrics n (EKG.Distribution stats) t =
    let f n' v = Metric ("dist." <> n <> "." <> n') v t
    in V.fromList [ f "mean" (Stats.mean stats)
                  , f "variance" (Stats.variance stats)
                  , f "count" (fromIntegral $ Stats.count stats)
                  , f "sum" (Stats.sum stats)
                  , f "min" (Stats.min stats)
                  , f "max" (Stats.max stats)
                  ]

data Metric = Metric
  { path  :: !T.Text
  , value :: !Double
  , timestamp :: !TimeSpec
  } deriving (Show)

sendMetrics :: Influxdb.Handle -> V.Vector Metric -> IO ()
sendMetrics handle metrics = V.forM_ metrics $ \metric -> do
  let tags   = M.empty
      values = M.singleton "value" (Influxdb.F (value metric))
      ts     = fromIntegral . toNanoSecs $ (timestamp metric)
    in Influxdb.writePoint' handle (path metric) tags values ts