{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE FlexibleContexts #-}
module Network.StatsD.Datadog (
DogStatsSettings(..),
defaultSettings,
withDogStatsD,
mkStatsClient,
finalizeStatsClient,
send,
metric,
Metric,
MetricName(..),
MetricType(..),
event,
Event,
serviceCheck,
ServiceCheck,
ServiceCheckStatus(..),
ToStatsD,
Tag,
tag,
ToMetricValue(..),
value,
Priority(..),
AlertType(..),
HasName(..),
HasSampleRate(..),
HasType'(..),
HasTags(..),
HasTitle(..),
HasText(..),
HasDateHappened(..),
HasHostname(..),
HasAggregationKey(..),
HasPriority(..),
HasSourceTypeName(..),
HasAlertType(..),
HasHost(..),
HasPort(..),
HasBufferSize(..),
HasMaxDelay(..),
HasOnException(..),
HasStatus(..),
HasMessage(..),
StatsClient(Dummy)
) where
import Control.Applicative ((<$>))
import Control.Exception (SomeException)
import Control.Lens
import Control.Monad (void)
import Control.Reaper
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as L
import Data.BufferBuilder.Utf8
import Data.List (intersperse)
import qualified Data.Sequence as Seq
import qualified Data.ByteString as B
import qualified Data.Foldable as F
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Text.Encoding (encodeUtf8)
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import System.IO
( BufferMode(BlockBuffering)
, Handle
, IOMode(WriteMode)
)
import UnliftIO
epochTime :: UTCTime -> Int
epochTime = round . utcTimeToPOSIXSeconds
newtype MetricName = MetricName { fromMetricName :: Text }
cleanMetricText :: Text -> Text
cleanMetricText = T.map $ \c -> case c of
':' -> '_'
'|' -> '_'
'@' -> '_'
_ -> c
{-# INLINE cleanMetricText #-}
escapeEventContents :: T.Text -> T.Text
escapeEventContents = T.replace "\n" "\\n"
{-# INLINE escapeEventContents #-}
newtype Tag = Tag { fromTag :: Utf8Builder () }
tag :: Text -> Text -> Tag
tag k v = Tag (build k >> appendChar7 ':' >> build v)
where
build = appendText . cleanMetricText
data MetricType = Gauge
| Counter
| Timer
| Histogram
| Set
class ToMetricValue a where
encodeValue :: a -> Utf8Builder ()
instance ToMetricValue Int where
encodeValue = appendDecimalSignedInt
instance ToMetricValue Double where
encodeValue = appendDecimalDouble
metric :: (ToMetricValue a) => MetricName -> MetricType -> a -> Metric
metric n t v = Metric n 1 t (encodeValue v) []
data Metric = Metric
{ metricName :: !MetricName
, metricSampleRate :: {-# UNPACK #-} !Double
, metricType' :: !MetricType
, mValue :: !(Utf8Builder ())
, metricTags :: ![Tag]
}
makeFields ''Metric
value :: ToMetricValue a => Setter Metric Metric (Utf8Builder ()) a
value = sets $ \f m -> m { mValue = encodeValue $ f $ mValue m }
{-# INLINE value #-}
renderMetric :: Metric -> Utf8Builder ()
renderMetric (Metric n sr t v ts) = do
appendText $ cleanMetricText $ fromMetricName n
appendChar7 ':'
v
appendChar7 '|'
unit
formatRate
formatTags
where
unit = case t of
Gauge -> appendChar7 'g'
Counter -> appendChar7 'c'
Timer -> appendBS7 "ms"
Histogram -> appendChar7 'h'
Set -> appendChar7 's'
formatTags = case ts of
[] -> return ()
xs -> appendBS7 "|#" >> F.sequence_ (intersperse (appendChar7 ',') $ map fromTag xs)
formatRate = if sr == 1 then return () else appendBS7 "|@" >> appendDecimalDouble sr
data Priority = Low | Normal
data AlertType = Error | Warning | Info | Success
event :: Text -> Text -> Event
event t d = Event t d Nothing Nothing Nothing Nothing Nothing Nothing []
data Event = Event
{ eventTitle :: {-# UNPACK #-} !Text
, eventText :: {-# UNPACK #-} !Text
, eventDateHappened :: !(Maybe UTCTime)
, eventHostname :: !(Maybe Text)
, eventAggregationKey :: !(Maybe Text)
, eventPriority :: !(Maybe Priority)
, eventSourceTypeName :: !(Maybe Text)
, eventAlertType :: !(Maybe AlertType)
, eventTags :: ![Tag]
}
makeFields ''Event
renderEvent :: Event -> Utf8Builder ()
renderEvent e = do
appendBS7 "_e{"
encodeValue $ B.length escapedTitle
appendChar7 ','
encodeValue $ B.length escapedText
appendBS7 "}:"
unsafeAppendBS escapedTitle
appendChar7 '|'
unsafeAppendBS escapedText
happened
formatHostname
aggregation
formatPriority
sourceType
alert
formatTags
where
escapedTitle = encodeUtf8 $ escapeEventContents $ eventTitle e
escapedText = encodeUtf8 $ escapeEventContents $ eventText e
makeField c v = F.forM_ v $ \jv ->
appendChar7 '|' >> appendChar7 c >> appendChar7 ':' >> jv
cleanTextValue f = (appendText . cleanMetricText) <$> f e
happened = F.forM_ (eventDateHappened e) $ \h -> do
appendBS7 "|d:"
appendDecimalSignedInt $ epochTime h
formatHostname = makeField 'h' $ cleanTextValue eventHostname
aggregation = makeField 'k' $ cleanTextValue eventAggregationKey
formatPriority = F.forM_ (eventPriority e) $ \p -> do
appendBS7 "|p:"
appendBS7 $ case p of
Low -> "low"
Normal -> "normal"
sourceType = makeField 's' $ cleanTextValue eventSourceTypeName
alert = F.forM_ (eventAlertType e) $ \a -> do
appendBS7 "|t:"
appendBS7 $ case a of
Error -> "error"
Warning -> "warning"
Info -> "info"
Success -> "success"
formatTags = case eventTags e of
[] -> return ()
ts -> do
appendBS7 "|#"
sequence_ $ intersperse (appendChar7 ',') $ map fromTag ts
data ServiceCheckStatus
= ServiceOk
| ServiceWarning
| ServiceCritical
| ServiceUnknown
deriving (Read, Show, Eq, Ord, Enum)
data ServiceCheck = ServiceCheck
{ serviceCheckName :: {-# UNPACK #-} !Text
, serviceCheckStatus :: !ServiceCheckStatus
, serviceCheckMessage :: !(Maybe Text)
, serviceCheckDateHappened :: !(Maybe UTCTime)
, serviceCheckHostname :: !(Maybe Text)
, serviceCheckTags :: ![Tag]
}
makeFields ''ServiceCheck
serviceCheck ::
Text
-> ServiceCheckStatus
-> ServiceCheck
serviceCheck n s = ServiceCheck n s Nothing Nothing Nothing []
class ToStatsD a where
toStatsD :: a -> Utf8Builder ()
instance ToStatsD Metric where
toStatsD = renderMetric
instance ToStatsD Event where
toStatsD = renderEvent
instance ToStatsD ServiceCheck where
toStatsD check = do
appendBS7 "_sc|"
appendText $ cleanMetricText $ check ^. name
appendChar7 '|'
appendDecimalSignedInt $ fromEnum $ check ^. status
F.forM_ (check ^. message) $ \msg ->
appendBS7 "|m:" >> appendText (cleanMetricText msg)
F.forM_ (check ^. dateHappened) $ \ts -> do
appendBS7 "|d:"
appendDecimalSignedInt $ epochTime ts
F.forM_ (check ^. hostname) $ \hn ->
appendBS7 "|h:" >> appendText (cleanMetricText hn)
case check ^. tags of
[] -> return ()
ts -> do
appendBS7 "|#"
sequence_ $ intersperse (appendChar7 ',') $ map fromTag ts
data DogStatsSettings = DogStatsSettings
{ dogStatsSettingsHost :: HostName
, dogStatsSettingsPort :: !Int
, dogStatsSettingsBufferSize :: !Int
, dogStatsSettingsMaxDelay :: !Int
, dogStatsSettingsOnException :: (SomeException -> Seq.Seq ByteString -> IO (Seq.Seq ByteString -> Seq.Seq ByteString))
}
makeFields ''DogStatsSettings
defaultSettings :: DogStatsSettings
defaultSettings =
DogStatsSettings
{ dogStatsSettingsHost = "127.0.0.1"
, dogStatsSettingsPort = 8125
, dogStatsSettingsBufferSize = 65507
, dogStatsSettingsMaxDelay = 1000000
, dogStatsSettingsOnException =
\e _ ->
putStrLn
(show e ++
"\nDropping all accumulated stats due to error. This behavior may be overridden by setting the onException handler of DogStatsSettings.") >>
return (const Seq.empty)
}
accumulateStats ::
Int
-> Seq.Seq ByteString
-> (L.ByteString, Seq.Seq ByteString)
accumulateStats maxBufSize = go 0 []
where
go :: Int -> [ByteString] -> Seq.Seq ByteString -> (L.ByteString, Seq.Seq ByteString)
go !accum chunks s = case Seq.viewl s of
Seq.EmptyL -> (finalizeChunks chunks, Seq.empty)
(bs Seq.:< rest) -> let newSize = B.length bs + accum in if newSize > maxBufSize
then (finalizeChunks chunks, s)
else go newSize (bs : chunks) rest
finalizeChunks :: [ByteString] -> L.ByteString
finalizeChunks = L.fromChunks . reverse
mkStatsClient :: MonadIO m => DogStatsSettings -> m StatsClient
mkStatsClient s = liftIO $ do
addrInfos <- getAddrInfo
(Just $ defaultHints { addrFlags = [AI_PASSIVE] })
(Just $ s ^. host)
(Just $ show $ s ^. port)
case addrInfos of
[] -> error "No address for hostname"
(serverAddr:_) -> do
sock <- socket (addrFamily serverAddr) Datagram defaultProtocol
connect sock (addrAddress serverAddr)
h <- socketToHandle sock WriteMode
hSetBuffering h (BlockBuffering $ Just $ dogStatsSettingsBufferSize s)
let reaperSettings = defaultReaperSettings
{ reaperAction = \stats -> catch (builderAction h (dogStatsSettingsBufferSize s) stats) $ \e ->
dogStatsSettingsOnException s e stats
, reaperDelay = dogStatsSettingsMaxDelay s
, reaperCons = \item work -> work Seq.|> runUtf8Builder item
, reaperNull = Seq.null
, reaperEmpty = Seq.empty
}
r <- mkReaper reaperSettings
return $ StatsClient h r s
builderAction :: Handle -> Int -> Seq.Seq ByteString -> IO (Seq.Seq ByteString -> Seq.Seq ByteString)
builderAction h maxBufSize s = case Seq.viewl s of
Seq.EmptyL -> return $ const Seq.empty
_ -> do
let (toFlush, rest) = accumulateStats maxBufSize s
L.hPut h toFlush
hFlush h
builderAction h maxBufSize rest
withDogStatsD :: MonadUnliftIO m => DogStatsSettings -> (StatsClient -> m a) -> m a
withDogStatsD s = bracket (mkStatsClient s) finalizeStatsClient
data StatsClient = StatsClient
{ statsClientHandle :: !Handle
, statsClientReaper :: Reaper (Seq.Seq ByteString) (Utf8Builder ())
, statsClientSettings :: DogStatsSettings
}
| Dummy
send :: (MonadIO m, ToStatsD v) => StatsClient -> v -> m ()
send StatsClient {statsClientReaper} v =
liftIO $ reaperAdd statsClientReaper (toStatsD v >> appendChar7 '\n')
send Dummy _ = return ()
{-# INLINEABLE send #-}
finalizeStatsClient :: MonadIO m => StatsClient -> m ()
finalizeStatsClient (StatsClient h r s) = liftIO $ do
remainingStats <- reaperStop r
void $ builderAction h (dogStatsSettingsBufferSize s) remainingStats
hClose h
finalizeStatsClient Dummy = return ()