module System.Remote.Monitoring.Statsd
(
Statsd
, statsdThreadId
, forkStatsd
, StatsdOptions(..)
, defaultStatsdOptions
) where
import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo)
import Control.Exception (IOException, catch)
import Control.Monad (forM_, when)
import qualified Data.ByteString.Char8 as B8
import qualified Data.HashMap.Strict as M
import Data.Int (Int64)
import Data.Monoid ((<>))
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Text.IO as T
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Network.Socket as Socket
import qualified Network.Socket.ByteString as Socket
import qualified System.Metrics as Metrics
import qualified System.Metrics.Distribution.Internal as Distribution
import System.IO (stderr)
#if __GLASGOW_HASKELL__ >= 706
import Control.Concurrent (forkFinally)
#else
import Control.Concurrent (forkIO)
import Control.Exception (SomeException, mask, try)
import Prelude hiding (catch)
#endif
data Statsd = Statsd
{ threadId :: !ThreadId
}
statsdThreadId :: Statsd -> ThreadId
statsdThreadId = threadId
data StatsdOptions = StatsdOptions
{
host :: !T.Text
, port :: !Int
, flushInterval :: !Int
, debug :: !Bool
, prefix :: !T.Text
, suffix :: !T.Text
}
defaultStatsdOptions :: StatsdOptions
defaultStatsdOptions = StatsdOptions
{ host = "127.0.0.1"
, port = 8125
, flushInterval = 1000
, debug = False
, prefix = ""
, suffix = ""
}
forkStatsd :: StatsdOptions
-> Metrics.Store
-> IO Statsd
forkStatsd opts store = do
addrInfos <- Socket.getAddrInfo Nothing (Just $ T.unpack $ host opts)
(Just $ show $ port opts)
(sendSample, closeSocket) <- case addrInfos of
[] -> unsupportedAddressError
(addrInfo:_) -> do
socket <- Socket.socket (Socket.addrFamily addrInfo)
Socket.Datagram Socket.defaultProtocol
let socketAddress = Socket.addrAddress addrInfo
sendSample <- if debug opts
then do
Socket.connect socket socketAddress
return $ \msg -> Socket.sendAll socket msg
else return $ \msg -> Socket.sendAllTo socket msg socketAddress
return (sendSample, Socket.close socket)
me <- myThreadId
tid <- forkFinally (loop store emptySample sendSample opts) $ \ r -> do
closeSocket
case r of
Left e -> throwTo me e
Right _ -> return ()
return $ Statsd tid
where
unsupportedAddressError = ioError $ userError $
"unsupported address: " ++ T.unpack (host opts)
emptySample = M.empty
loop :: Metrics.Store
-> Metrics.Sample
-> (B8.ByteString -> IO ())
-> StatsdOptions
-> IO ()
loop store lastSample sendSample opts = do
start <- time
sample <- Metrics.sampleAll store
let !diff = diffSamples lastSample sample
flushSample diff sendSample opts
end <- time
threadDelay (flushInterval opts * 1000 fromIntegral (end start))
loop store sample sendSample opts
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` getPOSIXTime
where toDouble = realToFrac :: Real a => a -> Double
diffSamples :: Metrics.Sample -> Metrics.Sample -> Metrics.Sample
diffSamples prev curr = M.foldlWithKey' combine M.empty curr
where
combine m name new = case M.lookup name prev of
Just old -> case diffMetric old new of
Just val -> M.insert name val m
Nothing -> m
_ -> M.insert name new m
diffMetric :: Metrics.Value -> Metrics.Value -> Maybe Metrics.Value
diffMetric (Metrics.Counter n1) (Metrics.Counter n2)
| n1 == n2 = Nothing
| otherwise = Just $! Metrics.Counter $ n2 n1
diffMetric (Metrics.Gauge n1) (Metrics.Gauge n2)
| n1 == n2 = Nothing
| otherwise = Just $ Metrics.Gauge n2
diffMetric (Metrics.Label n1) (Metrics.Label n2)
| n1 == n2 = Nothing
| otherwise = Just $ Metrics.Label n2
diffMetric (Metrics.Distribution d1) (Metrics.Distribution d2)
| Distribution.count d1 == Distribution.count d2 = Nothing
| otherwise = Just $ Metrics.Distribution $ d2
{ Distribution.count = Distribution.count d2 Distribution.count d1
}
diffMetric _ _ = Nothing
flushSample :: Metrics.Sample -> (B8.ByteString -> IO ()) -> StatsdOptions -> IO ()
flushSample sample sendSample opts = do
forM_ (M.toList sample) $ \ (name, val) ->
let fullName = dottedPrefix <> name <> dottedSuffix
in flushMetric fullName val
where
flushMetric name (Metrics.Counter n) = send "|c" name (show n)
flushMetric name (Metrics.Gauge n) = send "|g" name (show n)
flushMetric name (Metrics.Distribution d) = sendDistribution name d
flushMetric _ (Metrics.Label _) = return ()
sendDistribution name d = do
send "|g" (name <> "." <> "mean" ) (show $ Distribution.mean d)
send "|g" (name <> "." <> "variance") (show $ Distribution.variance d)
send "|c" (name <> "." <> "count" ) (show $ Distribution.count d)
send "|g" (name <> "." <> "sum" ) (show $ Distribution.sum d)
send "|g" (name <> "." <> "min" ) (show $ Distribution.min d)
send "|g" (name <> "." <> "max" ) (show $ Distribution.max d)
isDebug = debug opts
dottedPrefix = if T.null (prefix opts) then "" else prefix opts <> "."
dottedSuffix = if T.null (suffix opts) then "" else "." <> suffix opts
send ty name val = do
let !msg = B8.concat [T.encodeUtf8 name, ":", B8.pack val, ty]
when isDebug $ B8.hPutStrLn stderr $ B8.concat [ "DEBUG: ", msg]
sendSample msg `catch` \ (e :: IOException) -> do
T.hPutStrLn stderr $ "ERROR: Couldn't send message: " <>
T.pack (show e)
return ()
#if __GLASGOW_HASKELL__ < 706
forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally action and_then =
mask $ \restore ->
forkIO $ try (restore action) >>= and_then
#endif