{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module System.Remote.Monitoring.Statsd
(
Statsd
, statsdFlush
, statsdThreadId
, forkStatsd
, StatsdOptions(..)
, defaultStatsdOptions
) where
import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar (modifyMVar_, newMVar)
import Control.Exception (IOException, AsyncException(ThreadKilled), catch, fromException)
import Control.Monad (foldM, when)
import qualified Data.ByteString.Char8 as B8
import qualified Data.HashMap.Strict as M
import Data.Int (Int64)
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Text.IO as T
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Network.Socket as Socket
import qualified Network.Socket.ByteString as Socket
import qualified System.Metrics as Metrics
import qualified System.Metrics.Distribution.Internal as Distribution
import System.IO (stderr)
#if __GLASGOW_HASKELL__ >= 706
import Control.Concurrent (forkFinally)
#else
import Control.Concurrent (forkIO)
import Control.Exception (SomeException, mask, try)
import Prelude hiding (catch)
#endif
data Statsd = Statsd
{ Statsd -> ThreadId
threadId :: {-# UNPACK #-} !ThreadId
, Statsd -> IO ()
flush :: IO ()
}
statsdThreadId :: Statsd -> ThreadId
statsdThreadId :: Statsd -> ThreadId
statsdThreadId = Statsd -> ThreadId
threadId
statsdFlush :: Statsd -> IO ()
statsdFlush :: Statsd -> IO ()
statsdFlush = Statsd -> IO ()
flush
data StatsdOptions = StatsdOptions
{
StatsdOptions -> Text
host :: !T.Text
, StatsdOptions -> Int
port :: !Int
, StatsdOptions -> Int
flushInterval :: !Int
, StatsdOptions -> Bool
debug :: !Bool
, StatsdOptions -> Text
prefix :: !T.Text
, StatsdOptions -> Text
suffix :: !T.Text
}
defaultStatsdOptions :: StatsdOptions
defaultStatsdOptions :: StatsdOptions
defaultStatsdOptions = StatsdOptions
{ host :: Text
host = Text
"127.0.0.1"
, port :: Int
port = Int
8125
, flushInterval :: Int
flushInterval = Int
1000
, debug :: Bool
debug = Bool
False
, prefix :: Text
prefix = Text
""
, suffix :: Text
suffix = Text
""
}
forkStatsd :: StatsdOptions
-> Metrics.Store
-> IO Statsd
forkStatsd :: StatsdOptions -> Store -> IO Statsd
forkStatsd StatsdOptions
opts Store
store = do
[AddrInfo]
addrInfos <- Maybe AddrInfo -> Maybe HostName -> Maybe HostName -> IO [AddrInfo]
forall (t :: * -> *).
GetAddrInfo t =>
Maybe AddrInfo
-> Maybe HostName -> Maybe HostName -> IO (t AddrInfo)
Socket.getAddrInfo Maybe AddrInfo
forall a. Maybe a
Nothing (HostName -> Maybe HostName
forall a. a -> Maybe a
Just (HostName -> Maybe HostName) -> HostName -> Maybe HostName
forall a b. (a -> b) -> a -> b
$ Text -> HostName
T.unpack (Text -> HostName) -> Text -> HostName
forall a b. (a -> b) -> a -> b
$ StatsdOptions -> Text
host StatsdOptions
opts)
(HostName -> Maybe HostName
forall a. a -> Maybe a
Just (HostName -> Maybe HostName) -> HostName -> Maybe HostName
forall a b. (a -> b) -> a -> b
$ Int -> HostName
forall a. Show a => a -> HostName
show (Int -> HostName) -> Int -> HostName
forall a b. (a -> b) -> a -> b
$ StatsdOptions -> Int
port StatsdOptions
opts)
(ByteString -> IO ()
sendSample, IO ()
closeSocket) <- case [AddrInfo]
addrInfos of
[] -> IO (ByteString -> IO (), IO ())
forall {a}. IO a
unsupportedAddressError
(AddrInfo
addrInfo:[AddrInfo]
_) -> do
Socket
socket <- Family -> SocketType -> ProtocolNumber -> IO Socket
Socket.socket (AddrInfo -> Family
Socket.addrFamily AddrInfo
addrInfo)
SocketType
Socket.Datagram ProtocolNumber
Socket.defaultProtocol
let socketAddress :: SockAddr
socketAddress = AddrInfo -> SockAddr
Socket.addrAddress AddrInfo
addrInfo
ByteString -> IO ()
sendSample <- if StatsdOptions -> Bool
debug StatsdOptions
opts
then do
Socket -> SockAddr -> IO ()
Socket.connect Socket
socket SockAddr
socketAddress
(ByteString -> IO ()) -> IO (ByteString -> IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((ByteString -> IO ()) -> IO (ByteString -> IO ()))
-> (ByteString -> IO ()) -> IO (ByteString -> IO ())
forall a b. (a -> b) -> a -> b
$ \ByteString
msg -> Socket -> ByteString -> IO ()
Socket.sendAll Socket
socket ByteString
msg
else (ByteString -> IO ()) -> IO (ByteString -> IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((ByteString -> IO ()) -> IO (ByteString -> IO ()))
-> (ByteString -> IO ()) -> IO (ByteString -> IO ())
forall a b. (a -> b) -> a -> b
$ \ByteString
msg -> Socket -> ByteString -> SockAddr -> IO ()
Socket.sendAllTo Socket
socket ByteString
msg SockAddr
socketAddress
(ByteString -> IO (), IO ()) -> IO (ByteString -> IO (), IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ()
sendSample, Socket -> IO ()
Socket.close Socket
socket)
MVar (HashMap Text Int64)
priorCountsVar <- HashMap Text Int64 -> IO (MVar (HashMap Text Int64))
forall a. a -> IO (MVar a)
newMVar HashMap Text Int64
forall k v. HashMap k v
M.empty
let flush :: IO ()
flush = do
Sample
sample <- Store -> IO Sample
Metrics.sampleAll Store
store
MVar (HashMap Text Int64)
-> (HashMap Text Int64 -> IO (HashMap Text Int64)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (HashMap Text Int64)
priorCountsVar (Sample
-> (ByteString -> IO ())
-> StatsdOptions
-> HashMap Text Int64
-> IO (HashMap Text Int64)
flushSample Sample
sample ByteString -> IO ()
sendSample StatsdOptions
opts)
ThreadId
me <- IO ThreadId
myThreadId
ThreadId
tid <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (StatsdOptions -> IO () -> IO ()
loop StatsdOptions
opts IO ()
flush) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ Either SomeException ()
r -> do
IO ()
closeSocket
case Either SomeException ()
r of
Left SomeException
e -> case SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just AsyncException
ThreadKilled -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe AsyncException
_ -> ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
me SomeException
e
Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Statsd -> IO Statsd
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Statsd -> IO Statsd) -> Statsd -> IO Statsd
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO () -> Statsd
Statsd ThreadId
tid IO ()
flush
where
unsupportedAddressError :: IO a
unsupportedAddressError = IOError -> IO a
forall a. IOError -> IO a
ioError (IOError -> IO a) -> IOError -> IO a
forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError (HostName -> IOError) -> HostName -> IOError
forall a b. (a -> b) -> a -> b
$
HostName
"unsupported address: " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Text -> HostName
T.unpack (StatsdOptions -> Text
host StatsdOptions
opts)
loop :: StatsdOptions
-> IO ()
-> IO ()
loop :: StatsdOptions -> IO () -> IO ()
loop StatsdOptions
opts IO ()
flush = do
Int64
start <- IO Int64
time
IO ()
flush
Int64
end <- IO Int64
time
Int -> IO ()
threadDelay (StatsdOptions -> Int
flushInterval StatsdOptions
opts Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
end Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
start))
StatsdOptions -> IO () -> IO ()
loop StatsdOptions
opts IO ()
flush
time :: IO Int64
time :: IO Int64
time = (Double -> Int64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> (POSIXTime -> Double) -> POSIXTime -> Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000.0) (Double -> Double) -> (POSIXTime -> Double) -> POSIXTime -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. POSIXTime -> Double
toDouble) (POSIXTime -> Int64) -> IO POSIXTime -> IO Int64
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` IO POSIXTime
getPOSIXTime
where toDouble :: POSIXTime -> Double
toDouble = a -> Double
forall {a}. Real a => a -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac :: Real a => a -> Double
flushSample :: Metrics.Sample -> (B8.ByteString -> IO ()) -> StatsdOptions -> M.HashMap T.Text Int64 -> IO (M.HashMap T.Text Int64)
flushSample :: Sample
-> (ByteString -> IO ())
-> StatsdOptions
-> HashMap Text Int64
-> IO (HashMap Text Int64)
flushSample Sample
sample ByteString -> IO ()
sendSample StatsdOptions
opts HashMap Text Int64
priorCounts =
(HashMap Text Int64 -> (Text, Value) -> IO (HashMap Text Int64))
-> HashMap Text Int64 -> [(Text, Value)] -> IO (HashMap Text Int64)
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM HashMap Text Int64 -> (Text, Value) -> IO (HashMap Text Int64)
flushOne HashMap Text Int64
priorCounts (Sample -> [(Text, Value)]
forall k v. HashMap k v -> [(k, v)]
M.toList Sample
sample)
where
flushOne :: HashMap Text Int64 -> (Text, Value) -> IO (HashMap Text Int64)
flushOne HashMap Text Int64
pc (Text
name, Value
val) =
let fullName :: Text
fullName = Text
dottedPrefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
sanitizeName Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dottedSuffix
in Text -> Value -> HashMap Text Int64 -> IO (HashMap Text Int64)
flushMetric Text
fullName Value
val HashMap Text Int64
pc
sanitizeName :: Text -> Text
sanitizeName = (Char -> Char) -> Text -> Text
T.map Char -> Char
sanitizeChar
sanitizeChar :: Char -> Char
sanitizeChar Char
':' = Char
'_'
sanitizeChar Char
c = Char
c
flushMetric :: Text -> Value -> HashMap Text Int64 -> IO (HashMap Text Int64)
flushMetric Text
name (Metrics.Counter Int64
n) HashMap Text Int64
pc = Text -> Int64 -> HashMap Text Int64 -> IO (HashMap Text Int64)
forall {v}.
(Show v, Num v) =>
Text -> v -> HashMap Text v -> IO (HashMap Text v)
sendCounter Text
name Int64
n HashMap Text Int64
pc
flushMetric Text
name (Metrics.Gauge Int64
n) HashMap Text Int64
pc = Text -> Int64 -> IO ()
forall {a}. Show a => Text -> a -> IO ()
sendGauge Text
name Int64
n IO () -> IO (HashMap Text Int64) -> IO (HashMap Text Int64)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HashMap Text Int64 -> IO (HashMap Text Int64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return HashMap Text Int64
pc
flushMetric Text
name (Metrics.Distribution Stats
d) HashMap Text Int64
pc = Text -> Stats -> HashMap Text Int64 -> IO (HashMap Text Int64)
sendDistribution Text
name Stats
d HashMap Text Int64
pc
flushMetric Text
_ (Metrics.Label Text
_) HashMap Text Int64
pc = HashMap Text Int64 -> IO (HashMap Text Int64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return HashMap Text Int64
pc
sendGauge :: Text -> a -> IO ()
sendGauge Text
name a
n = ByteString -> Text -> HostName -> IO ()
send ByteString
"|g" Text
name (a -> HostName
forall a. Show a => a -> HostName
show a
n)
sendCounter :: Text -> v -> HashMap Text v -> IO (HashMap Text v)
sendCounter Text
name v
n HashMap Text v
pc = do
let old :: v
old = v -> Maybe v -> v
forall a. a -> Maybe a -> a
fromMaybe v
0 (Text -> HashMap Text v -> Maybe v
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
M.lookup Text
name HashMap Text v
pc)
ByteString -> Text -> HostName -> IO ()
send ByteString
"|c" Text
name (v -> HostName
forall a. Show a => a -> HostName
show (v
n v -> v -> v
forall a. Num a => a -> a -> a
- v
old))
HashMap Text v -> IO (HashMap Text v)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> v -> HashMap Text v -> HashMap Text v
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
M.insert Text
name v
n HashMap Text v
pc)
sendDistribution :: Text -> Stats -> HashMap Text Int64 -> IO (HashMap Text Int64)
sendDistribution Text
name Stats
d HashMap Text Int64
pc = do
Text -> Double -> IO ()
forall {a}. Show a => Text -> a -> IO ()
sendGauge (Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"mean" ) (Stats -> Double
Distribution.mean Stats
d)
Text -> Double -> IO ()
forall {a}. Show a => Text -> a -> IO ()
sendGauge (Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"variance") (Stats -> Double
Distribution.variance Stats
d)
HashMap Text Int64
uc <- Text -> Int64 -> HashMap Text Int64 -> IO (HashMap Text Int64)
forall {v}.
(Show v, Num v) =>
Text -> v -> HashMap Text v -> IO (HashMap Text v)
sendCounter (Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"count" ) (Stats -> Int64
Distribution.count Stats
d) HashMap Text Int64
pc
Text -> Double -> IO ()
forall {a}. Show a => Text -> a -> IO ()
sendGauge (Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"sum" ) (Stats -> Double
Distribution.sum Stats
d)
Text -> Double -> IO ()
forall {a}. Show a => Text -> a -> IO ()
sendGauge (Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"min" ) (Stats -> Double
Distribution.min Stats
d)
Text -> Double -> IO ()
forall {a}. Show a => Text -> a -> IO ()
sendGauge (Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"max" ) (Stats -> Double
Distribution.max Stats
d)
HashMap Text Int64 -> IO (HashMap Text Int64)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return HashMap Text Int64
uc
isDebug :: Bool
isDebug = StatsdOptions -> Bool
debug StatsdOptions
opts
dottedPrefix :: Text
dottedPrefix = if Text -> Bool
T.null (StatsdOptions -> Text
prefix StatsdOptions
opts) then Text
"" else StatsdOptions -> Text
prefix StatsdOptions
opts Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"."
dottedSuffix :: Text
dottedSuffix = if Text -> Bool
T.null (StatsdOptions -> Text
suffix StatsdOptions
opts) then Text
"" else Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> StatsdOptions -> Text
suffix StatsdOptions
opts
send :: ByteString -> Text -> HostName -> IO ()
send ByteString
ty Text
name HostName
val = do
let !msg :: ByteString
msg = [ByteString] -> ByteString
B8.concat [Text -> ByteString
T.encodeUtf8 Text
name, ByteString
":", HostName -> ByteString
B8.pack HostName
val, ByteString
ty]
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isDebug (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> ByteString -> IO ()
B8.hPutStrLn Handle
stderr (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B8.concat [ ByteString
"DEBUG: ", ByteString
msg]
ByteString -> IO ()
sendSample ByteString
msg IO () -> (IOError -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \ (IOError
e :: IOException) -> do
Handle -> Text -> IO ()
T.hPutStrLn Handle
stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"ERROR: Couldn't send message: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<>
HostName -> Text
T.pack (IOError -> HostName
forall a. Show a => a -> HostName
show IOError
e)
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#if __GLASGOW_HASKELL__ < 706
forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally action and_then =
mask $ \restore ->
forkIO $ try (restore action) >>= and_then
#endif