{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module Katip.Scribes.Datadog.TCP
(
APIKey(..)
, DatadogScribeSettings(..)
, DatadogAuth(..)
, directAPIConnectionParams
, localAgentConnectionParams
, mkDatadogScribeSettings
, mkDatadogScribe
) where
import qualified Control.Concurrent as Conc
import qualified Control.Exception.Safe as EX
import Control.Monad (void)
import qualified Control.Retry as Retry
import qualified Data.Aeson as A
import qualified Data.Binary.Builder as BB
import qualified Data.ByteString.Lazy as BSL
import Data.Monoid as Monoid
import qualified Data.Pool as Pool
import Data.Text (Text)
import qualified Data.Text.Encoding as T
import qualified Data.Text.Lazy.Builder as B
import Data.Time (NominalDiffTime)
import Katip
import Katip.Core (LocJs (..))
import qualified Network.Connection as C
import qualified Network.Socket as Net
import qualified System.Posix.Types as POSIX
newtype APIKey = APIKey
{ apiKey :: Text
} deriving (Show, Eq)
data DatadogAuth =
NoAuthLocal
| DirectAuth APIKey
data DatadogScribeSettings = DatadogScribeSettings
{ datadogScribeSettings_connectionParams :: C.ConnectionParams
, datadogScribeSettings_auth :: DatadogAuth
, datadogScribeSettings_poolStripes :: Int
, datadogScribeSettings_connsPerStripe :: Int
, datadogScribeSettings_connectionKeepalive :: NominalDiffTime
, datadogScribeSettings_retry :: Retry.RetryPolicyM IO
}
directAPIConnectionParams :: C.ConnectionParams
directAPIConnectionParams = C.ConnectionParams
{ C.connectionHostname = "intake.logs.datadoghq.com"
, C.connectionPort = 10516
, C.connectionUseSecure = Just $ C.TLSSettingsSimple
{ C.settingDisableCertificateValidation = False
, C.settingDisableSession = False
, C.settingUseServerName = False
}
, C.connectionUseSocks = Nothing
}
localAgentConnectionParams :: Net.PortNumber -> C.ConnectionParams
localAgentConnectionParams port = C.ConnectionParams
{ C.connectionHostname = "127.0.0.1"
, C.connectionPort = port
, C.connectionUseSecure = Nothing
, C.connectionUseSocks = Nothing
}
mkDatadogScribeSettings :: C.ConnectionParams -> DatadogAuth -> IO DatadogScribeSettings
mkDatadogScribeSettings connectionParams auth = do
capabilities <- Conc.getNumCapabilities
pure $ DatadogScribeSettings
{ datadogScribeSettings_connectionParams = connectionParams
, datadogScribeSettings_auth = auth
, datadogScribeSettings_poolStripes = 1
, datadogScribeSettings_connsPerStripe = capabilities
, datadogScribeSettings_connectionKeepalive = 30
, datadogScribeSettings_retry = Retry.exponentialBackoff 25000 <> Retry.limitRetries 5
}
mkDatadogScribe
:: DatadogScribeSettings
-> PermitFunc
-> Verbosity
-> IO Scribe
mkDatadogScribe settings permit verb = do
connectionContext <- C.initConnectionContext
pool <- Pool.createPool
(C.connectTo connectionContext (datadogScribeSettings_connectionParams settings))
C.connectionClose
(datadogScribeSettings_poolStripes settings)
(datadogScribeSettings_connectionKeepalive settings)
(datadogScribeSettings_connsPerStripe settings)
pure $ Scribe
{ liPush = pushPool (datadogScribeSettings_retry settings) pool keyBuilder verb
, scribeFinalizer = Pool.destroyAllResources pool
, scribePermitItem = permit
}
where
!keyBuilder = case datadogScribeSettings_auth settings of
NoAuthLocal -> Nothing
DirectAuth (APIKey k) -> Just (BB.fromByteString (T.encodeUtf8 k))
pushPool
:: LogItem a
=> Retry.RetryPolicyM IO
-> Pool.Pool C.Connection
-> Maybe BB.Builder
-> Verbosity
-> Item a
-> IO ()
pushPool retryPolicy pool token verb item =
void $ Retry.retrying retryPolicy (\_status shouldRetry -> pure shouldRetry) $ \_status -> do
res <- EX.tryAny $ Pool.withResource pool $ \conn -> do
C.connectionPut conn rendered
pure $ case res of
Left _ -> True
Right _ -> False
where
payloadBuilder = A.fromEncoding (encodeDatadog verb item) Monoid.<> "\n"
messageBuilder = case token of
Nothing -> payloadBuilder
Just directKey -> directKey <> " " <> payloadBuilder
rendered = BSL.toStrict (BB.toLazyByteString messageBuilder)
encodeDatadog :: LogItem a => Verbosity -> Item a -> A.Encoding
encodeDatadog verb i = A.pairs $
"service" A..= _itemApp i <>
"severity" A..= _itemSeverity i <>
"env" A..= _itemEnv i <>
"thread" A..= getThreadIdText (_itemThread i) <>
"host" A..= _itemHost i <>
"pid" A..= pidInt <>
"data" A..= payloadObject verb (_itemPayload i) <>
"message" A..= (B.toLazyText (unLogStr (_itemMessage i))) <>
"date" A..= _itemTime i <>
"ns" A..= _itemNamespace i <>
"loc" A..= fmap LocJs (_itemLoc i)
where
POSIX.CPid pidInt = _itemProcess i