{-# LANGUAGE OverloadedStrings #-}

-- | This module lets you periodically flush metrics to a Graphite Carbon
-- backend. Example usage:
--
-- > main = do
-- >   store <- newStore
-- >   forkCarbon defaultCarbonOptions store
--
-- You probably want to include some of the predefined metrics defined
-- in the @ekg-core@ package, by calling e.g. the 'EKG.registerGcMetrics'
-- function defined in that package.
module System.Remote.Monitoring.Carbon
  ( CarbonOptions(..)
  , defaultCarbonOptions
  , forkCarbon
  ) where

import Control.Concurrent (ThreadId, forkFinally, myThreadId, threadDelay, throwTo)
import Control.Monad (forever)
import Data.Int (Int64)
import Data.Monoid ((<>))

import qualified Data.HashMap.Strict as HashMap
import qualified Data.Text as T
import qualified Data.Time as Time
import qualified Data.Time.Clock.POSIX as Time
import qualified Data.Vector as V
import qualified Network.Carbon.Plaintext as Carbon
import qualified Network.Socket as Network
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as Stats

--------------------------------------------------------------------------------
-- | Options to control how to connect to the Carbon server and how often to
-- flush metrics. The flush interval should match the shortest retention rate
-- of the matching retention periods, or you risk over-riding previous
-- samples.
data CarbonOptions = CarbonOptions
  { -- | The hostname or IP address of the server running Carbon.
    host :: !T.Text

    -- | Server port of the TCP line receiver interface.
  , port :: !Int

    -- | The amount of time between sampling EKG metrics and pushing to Carbon.
  , flushInterval :: !Int

    -- | Prefix to add to all metric names.
  , prefix :: !T.Text

    -- | Suffix to add to all metric names. This is particularly
    -- useful for sending per host stats by settings this value to:
    -- @takeWhile (/= \'.\') \<$\> getHostName@, using @getHostName@
    -- from the @Network.BSD@ module in the network package.
  , suffix :: !T.Text
  } deriving (Eq, Show)


--------------------------------------------------------------------------------
-- | Defaults:
--
-- * @host@ = @\"127.0.0.1\"@
--
-- * @port@ = @2003@
--
-- * @flushInterval@ = @1000@
--
-- * Empty 'prefix' and 'suffix'.
defaultCarbonOptions :: CarbonOptions
defaultCarbonOptions = CarbonOptions
    { host          = "127.0.0.1"
    , port          = 2003
    , flushInterval = 1000
    , prefix        = ""
    , suffix        = ""
    }


--------------------------------------------------------------------------------
-- | Create a thread that periodically flushes the metrics in 'EKG.Store' to
-- Carbon.
forkCarbon :: CarbonOptions -> EKG.Store -> IO (ThreadId)
forkCarbon opts store = do
  addrInfos <- Network.getAddrInfo Nothing
                                   (Just $ T.unpack $ host opts)
                                   (Just $ show $ port opts)
  c <- case addrInfos of
    (addrInfo : _) -> Carbon.connect (Network.addrAddress addrInfo)
    _ -> unsupportedAddressError

  parent <- myThreadId
  forkFinally (loop store c opts)
              (\r -> do Carbon.disconnect c
                        case r of
                          Left e  -> throwTo parent e
                          Right _ -> return ())

  where
  unsupportedAddressError = ioError $ userError $
      "unsupported address: " ++ T.unpack (host opts)


--------------------------------------------------------------------------------
loop :: EKG.Store -> Carbon.Connection -> CarbonOptions -> IO ()
loop store socket opts = forever $ do
  start <- time
  sample <- EKG.sampleAll store
  flushSample sample socket opts
  end <- time
  threadDelay (flushInterval opts * 1000 - fromIntegral (end - start))

-- | Microseconds since epoch.
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` Time.getPOSIXTime
  where toDouble = realToFrac :: Real a => a -> Double

flushSample :: EKG.Sample -> Carbon.Connection -> CarbonOptions -> IO ()
flushSample sample conn opts = do
  t <- Time.getCurrentTime
  Carbon.sendMetrics conn
                     (V.map renamed
                            (HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms)
                                                   V.empty
                                                   sample))

  where
  renamed (Carbon.Metric n v t) =
    let p = if T.null (prefix opts) then "" else prefix opts <> "."
        s = if T.null (suffix opts) then "" else "." <> suffix opts
    in Carbon.Metric (p <> n <> s) v t

  metrics n (EKG.Counter i) t = V.singleton (Carbon.Metric n (fromIntegral i) t)
  metrics n (EKG.Gauge i) t = V.singleton (Carbon.Metric n (fromIntegral i) t)
  metrics _ (EKG.Label {}) _ = V.empty
  metrics n (EKG.Distribution stats) t =
    let f n' v = Carbon.Metric (n <> "." <> n') v t
    in V.fromList [ f "mean" (Stats.mean stats)
                  , f "variance" (Stats.variance stats)
                  , f "count" (fromIntegral $ Stats.count stats)
                  , f "sum" (Stats.sum stats)
                  , f "min" (Stats.min stats)
                  , f "max" (Stats.max stats)
                  ]