{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
module Katip.Scribes.LogzIO.HTTPS
(
mkLogzIOScribe
, BulkAPIError(..)
, LogzIOScribeConfiguration(..)
, Scheme(..)
, APIToken(..)
, LoggingError(..)
, usRegionHost
, euRegionHost
, httpsPort
, httpPort
, defaultRetryPolicy
, defaultLogzIOScribeConfiguration
, renderLineTruncated
, renderLineTruncated'
, maxPayloadBytes
, maxLogLineLength
, BulkBuffer (..)
, LogAction (..)
, bufferItem
, bufferItem'
, forceFlush
, Bytes (..)
) where
import Control.Applicative
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TBMQueue as STM
import qualified Control.Error as E
import qualified Control.Exception.Safe as EX
import Control.Monad
import qualified Control.Retry as Retry
import qualified Data.Aeson as A
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
import qualified Data.HashMap.Strict as HM
import Data.Int
import qualified Data.Scientific as Scientific
import Data.Semigroup as Semigroup
import Data.String (IsString)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Builder as TB
import qualified Data.Time as Time
import qualified Katip as K
import Katip.Core (LocJs (..))
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTPS
import qualified Network.HTTP.Types as HTypes
import qualified System.Posix.Types as POSIX
import qualified URI.ByteString as URIBS
data BulkAPIError = BulkAPIError
{ bulkAPIError_malformedLines :: Int
, bulkAPIError_successfulLines :: Int
, bulkAPIError_oversizedLines :: Int
, bulkAPIError_emptyLogLines :: Int
} deriving (Show, Eq)
instance A.FromJSON BulkAPIError where
parseJSON = A.withObject "BulkAPIError" $ \o -> do
malformedLines <- o A..: "malformedLines"
successfulLines <- o A..: "successfulLines"
oversizedLines <- o A..: "oversizedLines"
emptyLogLines <- o A..: "emptyLogLines"
pure $ BulkAPIError
{ bulkAPIError_malformedLines = malformedLines
, bulkAPIError_successfulLines = successfulLines
, bulkAPIError_oversizedLines = oversizedLines
, bulkAPIError_emptyLogLines = emptyLogLines
}
data LogzIOScribeConfiguration = LogzIOScribeConfiguration
{ logzIOScribeConfiguration_bufferItems :: Int
, logzIOScribeConfiguration_bufferTimeout :: Time.NominalDiffTime
, logzIOScribeConfiguration_scheme :: Scheme
, logzIOScribeConfiguration_host :: URIBS.Host
, logzIOScribeConfiguration_port :: URIBS.Port
, logzIOScribeConfiguration_token :: APIToken
, logzIOScribeConfiguration_retry :: Retry.RetryPolicyM IO
, logzIOScribeConfiguration_onError :: LoggingError -> IO ()
}
defaultLogzIOScribeConfiguration :: APIToken -> LogzIOScribeConfiguration
defaultLogzIOScribeConfiguration token = LogzIOScribeConfiguration
{ logzIOScribeConfiguration_bufferItems = 100
, logzIOScribeConfiguration_bufferTimeout = 30
, logzIOScribeConfiguration_scheme = HTTPS
, logzIOScribeConfiguration_host = usRegionHost
, logzIOScribeConfiguration_port = httpsPort
, logzIOScribeConfiguration_token = token
, logzIOScribeConfiguration_retry = defaultRetryPolicy
, logzIOScribeConfiguration_onError = const (pure ())
}
newtype APIToken = APIToken
{ apiToken :: T.Text
} deriving (Show, Eq, IsString)
data Scheme
= HTTPS
| HTTP
deriving (Show, Eq)
usRegionHost :: URIBS.Host
usRegionHost = URIBS.Host "listener.logz.io"
euRegionHost :: URIBS.Host
euRegionHost = URIBS.Host "listener-eu.logz.io"
httpsPort :: URIBS.Port
httpsPort = URIBS.Port 8071
httpPort :: URIBS.Port
httpPort = URIBS.Port 8070
defaultRetryPolicy :: (Monad m) => Retry.RetryPolicyM m
defaultRetryPolicy = Retry.exponentialBackoff 25000 `mappend` Retry.limitRetries 5
mkLogzIOScribe
:: LogzIOScribeConfiguration
-> K.PermitFunc
-> K.Verbosity
-> IO K.Scribe
mkLogzIOScribe config permitItem verbosity = do
ingestionQueue <- STM.newTBMQueueIO (itemBufferSize * 10)
let newTimer = STM.registerDelay itemBufferTimeoutMicros
timerRef <- STM.newTVarIO =<< newTimer
mgr <- case scheme of
HTTPS -> HTTPS.newTlsManager
HTTP -> HTTP.newManager HTTP.defaultManagerSettings
let workExhausted :: STM.STM Bool
workExhausted = (&&)
<$> STM.isClosedTBMQueue ingestionQueue
<*> STM.isEmptyTBMQueue ingestionQueue
let waitWorkExhausted :: STM.STM ()
waitWorkExhausted = STM.check =<< workExhausted
let pop :: STM.STM (Tick AnyLogItem)
pop = maybe WorkExhausted NewItem <$> STM.readTBMQueue ingestionQueue
let timeExpired :: STM.STM (Tick a)
timeExpired = do
isExpired <- STM.readTVar =<< STM.readTVar timerRef
STM.check isExpired
pure TimeExpired
let nextTick :: STM.STM (Tick AnyLogItem)
nextTick =
timeExpired <|>
pop <|>
(WorkExhausted <$ waitWorkExhausted)
let push :: AnyLogItem -> STM.STM ()
push = void . STM.writeTBMQueue ingestionQueue
let sealQueue = STM.atomically (STM.closeTBMQueue ingestionQueue)
let resetTimer = STM.atomically . STM.writeTVar timerRef =<< newTimer
let flush curBuffer = do
res <- flushBuffer config mgr curBuffer
case res of
Left e -> onErrorSafe e
Right () -> pure ()
resetTimer
let flushLoop :: BulkBuffer -> IO ()
flushLoop curBuffer = do
tick <- STM.atomically nextTick
case tick of
WorkExhausted -> do
flush curBuffer
pure ()
TimeExpired -> do
flush curBuffer
flushLoop mempty
NewItem (AnyLogItem item) -> do
case bufferItem (logzIOScribeConfiguration_bufferItems config) verbosity item curBuffer of
Buffered newBuffer -> flushLoop newBuffer
FlushNow flushThis newBuffer -> do
flush flushThis
flushLoop newBuffer
flushThread <- Async.async (flushLoop mempty)
let close = do
sealQueue
_ <- Async.waitCatch flushThread
pure ()
pure $ K.Scribe
{ K.liPush = STM.atomically . push . AnyLogItem
, K.scribeFinalizer = close
, K.scribePermitItem = permitItem
}
where
itemBufferSize = logzIOScribeConfiguration_bufferItems config
itemBufferTimeoutMicros = ndtToMicros (logzIOScribeConfiguration_bufferTimeout config)
scheme = logzIOScribeConfiguration_scheme config
onErrorSafe ex = do
_ <- EX.tryAny (logzIOScribeConfiguration_onError config ex)
pure ()
data AnyLogItem where
AnyLogItem :: K.LogItem a => K.Item a -> AnyLogItem
data Tick a =
TimeExpired
| NewItem !a
| WorkExhausted
ndtPicos :: Time.NominalDiffTime -> Int64
ndtPicos = round . (* picos)
where
picos :: Time.NominalDiffTime
picos = 10 ^ (9 :: Int)
ndtToMicros :: Time.NominalDiffTime -> Int
ndtToMicros t = round (((fromIntegral (ndtPicos t)) :: Double) / picosInMicro)
where
picosInMicro = 10 ^ (3 :: Int)
data LoggingError =
URIError HTTP.HttpException
| RequestError HTTP.HttpException
| PartialFailure BulkAPIError
| BadToken
| UnknownFailureResponse HTypes.Status LBS.ByteString
deriving (Show)
flushBuffer
:: LogzIOScribeConfiguration
-> HTTP.Manager
-> BulkBuffer
-> IO (Either LoggingError ())
flushBuffer config mgr bulkBuffer
| bulkBuffer_itemCount bulkBuffer <= 0 = pure (Right ())
| otherwise = do
E.runExceptT $ do
req <- E.fmapLT URIError (E.ExceptT (EX.try (configureRequest <$> HTTP.parseRequest uriStr)))
resp <- E.fmapLT RequestError $ E.ExceptT $ EX.try $
Retry.recovering retryPolicy [\_stat -> EX.Handler handleHttpException] $ \_stat ->
HTTP.httpLbs req mgr
let respLBS = HTTP.responseBody resp
let respStatus = HTTP.responseStatus resp
if HTypes.statusIsSuccessful respStatus
then pure ()
else case A.decode @BulkAPIError respLBS of
Nothing
| HTypes.statusCode respStatus == 401 -> E.throwE BadToken
| otherwise -> E.throwE (UnknownFailureResponse respStatus respLBS)
Just bulkError -> E.throwE (PartialFailure bulkError)
where
configureRequest req = req
{ HTTP.method = HTypes.methodPost
, HTTP.requestBody = HTTP.RequestBodyLBS (BB.toLazyByteString (bulkBuffer_payload bulkBuffer))
}
retryPolicy = logzIOScribeConfiguration_retry config
apiTokenBS = TE.encodeUtf8 (apiToken (logzIOScribeConfiguration_token config))
handleHttpException :: (Applicative m) => HTTP.HttpException -> m Bool
handleHttpException _ = pure True
uriStr = LBS8.unpack (BB.toLazyByteString (URIBS.serializeURIRef uri))
authority = URIBS.Authority
{ URIBS.authorityUserInfo = Nothing
, URIBS.authorityHost = logzIOScribeConfiguration_host config
, URIBS.authorityPort = Just (logzIOScribeConfiguration_port config)
}
uri = URIBS.URI
{ URIBS.uriScheme = case logzIOScribeConfiguration_scheme config of
HTTPS -> URIBS.Scheme "https"
HTTP -> URIBS.Scheme "http"
, URIBS.uriAuthority = Just authority
, URIBS.uriPath = "/"
, URIBS.uriQuery = URIBS.Query
[ ("token", apiTokenBS)
]
, URIBS.uriFragment = Nothing
}
newtype Bytes = Bytes
{ bytes :: Int64
} deriving (Show, Eq, Num, Ord, Bounded)
maxPayloadBytes :: Bytes
maxPayloadBytes = 10485760
maxLogLineLength :: Bytes
maxLogLineLength = 500000
measureJSONLine :: A.ToJSON a => a -> (BB.Builder, Bytes)
measureJSONLine a = (BB.lazyByteString lbs, Bytes (LBS.length lbs))
where
lbs = A.encode a <> "\n"
fullItemObject :: K.LogItem a => K.Verbosity -> K.Item a -> A.Object
fullItemObject verbosity item = HM.fromList
[ "app" A..= K._itemApp item
, "env" A..= K._itemEnv item
, "sev" A..= K._itemSeverity item
, "thread" A..= K.getThreadIdText (K._itemThread item)
, "host" A..= K._itemHost item
, "pid" A..= pidInt
, "data" A..= annotateKeys (K.payloadObject verbosity (K._itemPayload item))
, "message" A..= TB.toLazyText (K.unLogStr (K._itemMessage item))
, "@timestamp" A..= A.String (T.pack (Time.formatTime Time.defaultTimeLocale "%Y-%m-%dT%H:%M:%S%03QZ" (K._itemTime item)))
, "ns" A..= K._itemNamespace item
, "loc" A..= (LocJs <$> K._itemLoc item)
]
where
POSIX.CPid pidInt = K._itemProcess item
renderLineTruncated :: K.LogItem a => K.Verbosity -> K.Item a -> (BB.Builder, Bytes)
renderLineTruncated = renderLineTruncated' maxLogLineLength
renderLineTruncated'
:: K.LogItem a
=> Bytes
-> K.Verbosity
-> K.Item a
-> (BB.Builder, Bytes)
renderLineTruncated' customMaxLogLineLength verbosity item =
if fullSize <= customMaxLogLineLength
then (fullLine, fullSize)
else (fallbackLine, fallbackSize)
where
fullObject = fullItemObject verbosity item
(fullLine, fullSize) = measureJSONLine fullObject
blankObject :: A.Object
blankObject = HM.fromList
[ "message" A..= A.String ""
, "@timestamp" A..= K._itemTime item
]
(_, blankObjectSize) = measureJSONLine blankObject
messageBytesAllowed = maxLogLineLength - blankObjectSize
(fallbackLine, fallbackSize) = measureJSONLine fallbackObject
fallbackObject :: A.Object
fallbackObject = HM.fromList
[ "message" A..= A.toJSON (TL.take (bytes messageBytesAllowed) (TB.toLazyText (K.unLogStr (K._itemMessage item))))
, "@timestamp" A..= A.toJSON (K._itemTime item)
]
data BulkBuffer = BulkBuffer
{ bulkBuffer_bytesUsed :: !Bytes
, bulkBuffer_payload :: !BB.Builder
, bulkBuffer_itemCount :: !Int
}
instance Semigroup.Semigroup BulkBuffer where
(BulkBuffer bytesUsedA bufferA itemCountA) <>
(BulkBuffer bytesUsedB bufferB itemCountB) = BulkBuffer
(bytesUsedA + bytesUsedB)
(bufferA <> bufferB)
(itemCountA + itemCountB)
instance Monoid BulkBuffer where
mempty = BulkBuffer 0 mempty 0
mappend = (<>)
data LogAction buf =
Buffered
buf
| FlushNow
buf
buf
bufferItem
:: K.LogItem a
=> Int
-> K.Verbosity
-> K.Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem = bufferItem' maxPayloadBytes maxLogLineLength
bufferItem'
:: (K.LogItem a)
=> Bytes
-> Bytes
-> Int
-> K.Verbosity
-> K.Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem' customMaxPayload customMaxItem maxItems verb item bulkBuffer =
let (encodedLine, spaceNeeded) = renderLineTruncated' customMaxItem verb item
newBytesUsed = bulkBuffer_bytesUsed bulkBuffer + spaceNeeded
newItemCount = bulkBuffer_itemCount bulkBuffer + 1
in if newItemCount >= maxItems || newBytesUsed >= customMaxPayload
then FlushNow
bulkBuffer
BulkBuffer
{ bulkBuffer_bytesUsed = spaceNeeded
, bulkBuffer_payload = encodedLine
, bulkBuffer_itemCount = 1
}
else Buffered $
BulkBuffer
{ bulkBuffer_bytesUsed = newBytesUsed
, bulkBuffer_payload = bulkBuffer_payload bulkBuffer <> encodedLine
, bulkBuffer_itemCount = newItemCount
}
forceFlush :: (Monoid buf) => buf -> LogAction buf
forceFlush buf = FlushNow buf mempty
annotateValue :: A.Value -> A.Value
annotateValue (A.Object o) = A.Object (annotateKeys o)
annotateValue (A.Array a) = A.Array (annotateValue <$> a)
annotateValue x = x
annotateKeys :: A.Object -> A.Object
annotateKeys = HM.fromList . map go . HM.toList
where
go (k, A.Object o) = (k, A.Object (annotateKeys o))
go (k, A.Array a) = (k, A.Array (annotateValue <$> a))
go (k, s@(A.String _)) = (k <> stringAnn, s)
go (k, n@(A.Number sci)) = if Scientific.isFloating sci
then (k <> doubleAnn, n)
else (k <> longAnn, n)
go (k, b@(A.Bool _)) = (k <> booleanAnn, b)
go (k, A.Null) = (k <> nullAnn, A.Null)
stringAnn :: T.Text
stringAnn = "::s"
doubleAnn :: T.Text
doubleAnn = "::d"
longAnn :: T.Text
longAnn = "::l"
booleanAnn :: T.Text
booleanAnn = "::b"
nullAnn :: T.Text
nullAnn = "::n"