{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
module Katip.Scribes.LogzIO.HTTPS
(
mkLogzIOScribe
, BulkAPIError(..)
, LogzIOScribeConfiguration(..)
, Scheme(..)
, APIToken(..)
, LoggingError(..)
, usRegionHost
, euRegionHost
, httpsPort
, httpPort
, defaultRetryPolicy
, defaultLogzIOScribeConfiguration
, renderLineTruncated
, renderLineTruncated'
, maxPayloadBytes
, maxLogLineLength
, BulkBuffer (..)
, LogAction (..)
, bufferItem
, bufferItem'
, forceFlush
, Bytes (..)
) where
import Control.Applicative
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Error as E
import qualified Control.Exception.Safe as EX
import qualified Control.Retry as Retry
import qualified Data.Aeson as A
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
import qualified Data.HashMap.Strict as HM
import Data.Int
import qualified Data.Scientific as Scientific
import Data.Semigroup as Semigroup
import Data.String (IsString)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Builder as TB
import qualified Data.Time as Time
import qualified Katip as K
import Katip.Core (LocJs (..))
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTPS
import qualified Network.HTTP.Types as HTypes
import qualified System.Posix.Types as POSIX
import qualified URI.ByteString as URIBS
data BulkAPIError = BulkAPIError
{ bulkAPIError_malformedLines :: Int
, bulkAPIError_successfulLines :: Int
, bulkAPIError_oversizedLines :: Int
, bulkAPIError_emptyLogLines :: Int
} deriving (Show, Eq)
instance A.FromJSON BulkAPIError where
parseJSON = A.withObject "BulkAPIError" $ \o -> do
malformedLines <- o A..: "malformedLines"
successfulLines <- o A..: "successfulLines"
oversizedLines <- o A..: "oversizedLines"
emptyLogLines <- o A..: "emptyLogLines"
pure $ BulkAPIError
{ bulkAPIError_malformedLines = malformedLines
, bulkAPIError_successfulLines = successfulLines
, bulkAPIError_oversizedLines = oversizedLines
, bulkAPIError_emptyLogLines = emptyLogLines
}
data LogzIOScribeConfiguration = LogzIOScribeConfiguration
{ logzIOScribeConfiguration_bufferItems :: Int
, logzIOScribeConfiguration_bufferTimeout :: Time.NominalDiffTime
, logzIOScribeConfiguration_scheme :: Scheme
, logzIOScribeConfiguration_host :: URIBS.Host
, logzIOScribeConfiguration_port :: URIBS.Port
, logzIOScribeConfiguration_token :: APIToken
, logzIOScribeConfiguration_retry :: Retry.RetryPolicyM IO
, logzIOScribeConfiguration_onError :: LoggingError -> IO ()
}
defaultLogzIOScribeConfiguration :: APIToken -> LogzIOScribeConfiguration
defaultLogzIOScribeConfiguration token = LogzIOScribeConfiguration
{ logzIOScribeConfiguration_bufferItems = 100
, logzIOScribeConfiguration_bufferTimeout = 30
, logzIOScribeConfiguration_scheme = HTTPS
, logzIOScribeConfiguration_host = usRegionHost
, logzIOScribeConfiguration_port = httpsPort
, logzIOScribeConfiguration_token = token
, logzIOScribeConfiguration_retry = defaultRetryPolicy
, logzIOScribeConfiguration_onError = const (pure ())
}
newtype APIToken = APIToken
{ apiToken :: T.Text
} deriving (Show, Eq, IsString)
data Scheme
= HTTPS
| HTTP
deriving (Show, Eq)
usRegionHost :: URIBS.Host
usRegionHost = URIBS.Host "listener.logz.io"
euRegionHost :: URIBS.Host
euRegionHost = URIBS.Host "listener-eu.logz.io"
httpsPort :: URIBS.Port
httpsPort = URIBS.Port 8071
httpPort :: URIBS.Port
httpPort = URIBS.Port 8070
defaultRetryPolicy :: (Monad m) => Retry.RetryPolicyM m
defaultRetryPolicy = Retry.exponentialBackoff 25000 `mappend` Retry.limitRetries 5
mkLogzIOScribe
:: LogzIOScribeConfiguration
-> K.PermitFunc
-> K.Verbosity
-> IO K.Scribe
mkLogzIOScribe config permitItem verbosity = do
bufferState <- STM.newTVarIO (mempty :: BulkBuffer)
readyMVar <- STM.newEmptyTMVarIO :: IO (STM.TMVar BulkBuffer)
killSwitch <- STM.newTVarIO Continue
timerRef <- STM.newTVarIO =<< STM.registerDelay itemBufferTimeoutMicros
mgr <- case scheme of
HTTPS -> HTTPS.newTlsManager
HTTP -> HTTP.newManager HTTP.defaultManagerSettings
let processLogAction :: LogAction BulkBuffer -> STM.STM ()
processLogAction logAction = case logAction of
Buffered !newBuffer -> STM.writeTVar bufferState newBuffer
FlushNow !toFlush !newBuffer -> do
STM.writeTVar bufferState newBuffer
STM.putTMVar readyMVar toFlush
let push :: (K.LogItem a) => K.Item a -> STM.STM ()
push !logItem = do
!curBuffer <- STM.readTVar bufferState
processLogAction (bufferItem itemBufferSize verbosity logItem curBuffer)
let waitShutdown :: STM.STM ()
waitShutdown = do
killStatus <- STM.readTVar killSwitch
STM.check (killStatus == ShutDown)
let waitTimer :: STM.STM ()
waitTimer = do
timer <- STM.readTVar timerRef
timerExpired <- STM.readTVar timer
STM.check timerExpired
STM.writeTVar timer False
!curBuffer <- STM.readTVar bufferState
processLogAction (forceFlush curBuffer)
let timedFlushLoop :: IO ()
timedFlushLoop = do
res <- STM.atomically $
raceAlt
waitShutdown
waitTimer
case res of
Left () -> pure ()
Right () -> timedFlushLoop
timedFlusher <- Async.async timedFlushLoop
let flushLoop :: IO ()
flushLoop = do
readyBufferOrDie <- STM.atomically $
raceAlt
(STM.takeTMVar readyMVar)
waitShutdown
case readyBufferOrDie of
Right () -> pure ()
Left readyBuffer -> do
res <- flushBuffer config mgr readyBuffer
case res of
Left ex -> onErrorSafe ex
Right () -> pure ()
!newTimer <- STM.registerDelay itemBufferTimeoutMicros
STM.atomically $ do
STM.writeTVar timerRef newTimer
flushLoop
flusher <- Async.async flushLoop
let shutdown :: IO ()
shutdown = do
STM.atomically (STM.writeTVar killSwitch ShutDown)
_ <- Async.waitCatch timedFlusher
_ <- Async.waitCatch flusher
pure ()
pure $ K.Scribe
{ K.liPush = STM.atomically . push
, K.scribeFinalizer = shutdown
, K.scribePermitItem = permitItem
}
where
itemBufferSize = logzIOScribeConfiguration_bufferItems config
itemBufferTimeoutMicros = ndtToMicros (logzIOScribeConfiguration_bufferTimeout config)
scheme = logzIOScribeConfiguration_scheme config
onErrorSafe ex = do
_ <- EX.tryAny (logzIOScribeConfiguration_onError config ex)
pure ()
ndtPicos :: Time.NominalDiffTime -> Int64
ndtPicos = round . (* picos)
where
picos :: Time.NominalDiffTime
picos = 10 ^ (9 :: Int)
ndtToMicros :: Time.NominalDiffTime -> Int
ndtToMicros t = round (((fromIntegral (ndtPicos t)) :: Double) / picosInMicro)
where
picosInMicro = 10 ^ (3 :: Int)
data KillSwitch =
Continue
| ShutDown
deriving (Eq)
raceAlt :: (Alternative m) => m a -> m b -> m (Either a b)
raceAlt a b = (Left <$> a) <|> (Right <$> b)
data LoggingError =
URIError HTTP.HttpException
| RequestError HTTP.HttpException
| PartialFailure BulkAPIError
| BadToken
| UnknownFailureResponse HTypes.Status LBS.ByteString
deriving (Show)
flushBuffer
:: LogzIOScribeConfiguration
-> HTTP.Manager
-> BulkBuffer
-> IO (Either LoggingError ())
flushBuffer config mgr bulkBuffer
| bulkBuffer_itemCount bulkBuffer <= 0 = pure (Right ())
| otherwise = do
E.runExceptT $ do
req <- E.fmapLT URIError (E.ExceptT (EX.try (configureRequest <$> HTTP.parseRequest uriStr)))
resp <- E.fmapLT RequestError $ E.ExceptT $ EX.try $
Retry.recovering retryPolicy [\_stat -> EX.Handler handleHttpException] $ \_stat ->
HTTP.httpLbs req mgr
let respLBS = HTTP.responseBody resp
let respStatus = HTTP.responseStatus resp
if HTypes.statusIsSuccessful respStatus
then pure ()
else case A.decode @BulkAPIError respLBS of
Nothing
| HTypes.statusCode respStatus == 401 -> E.throwE BadToken
| otherwise -> E.throwE (UnknownFailureResponse respStatus respLBS)
Just bulkError -> E.throwE (PartialFailure bulkError)
where
configureRequest req = req
{ HTTP.method = HTypes.methodPost
, HTTP.requestBody = HTTP.RequestBodyLBS (BB.toLazyByteString (bulkBuffer_payload bulkBuffer))
}
retryPolicy = logzIOScribeConfiguration_retry config
apiTokenBS = TE.encodeUtf8 (apiToken (logzIOScribeConfiguration_token config))
handleHttpException :: (Applicative m) => HTTP.HttpException -> m Bool
handleHttpException _ = pure True
uriStr = LBS8.unpack (BB.toLazyByteString (URIBS.serializeURIRef uri))
authority = URIBS.Authority
{ URIBS.authorityUserInfo = Nothing
, URIBS.authorityHost = logzIOScribeConfiguration_host config
, URIBS.authorityPort = Just (logzIOScribeConfiguration_port config)
}
uri = URIBS.URI
{ URIBS.uriScheme = case logzIOScribeConfiguration_scheme config of
HTTPS -> URIBS.Scheme "https"
HTTP -> URIBS.Scheme "http"
, URIBS.uriAuthority = Just authority
, URIBS.uriPath = "/"
, URIBS.uriQuery = URIBS.Query
[ ("token", apiTokenBS)
]
, URIBS.uriFragment = Nothing
}
newtype Bytes = Bytes
{ bytes :: Int64
} deriving (Show, Eq, Num, Ord, Bounded)
maxPayloadBytes :: Bytes
maxPayloadBytes = 10485760
maxLogLineLength :: Bytes
maxLogLineLength = 500000
measureJSONLine :: A.ToJSON a => a -> (BB.Builder, Bytes)
measureJSONLine a = (BB.lazyByteString lbs, Bytes (LBS.length lbs))
where
lbs = A.encode a <> "\n"
fullItemObject :: K.LogItem a => K.Verbosity -> K.Item a -> A.Object
fullItemObject verbosity item = HM.fromList
[ "app" A..= K._itemApp item
, "env" A..= K._itemEnv item
, "sev" A..= K._itemSeverity item
, "thread" A..= K.getThreadIdText (K._itemThread item)
, "host" A..= K._itemHost item
, "pid" A..= pidInt
, "data" A..= annotateKeys (K.payloadObject verbosity (K._itemPayload item))
, "message" A..= TB.toLazyText (K.unLogStr (K._itemMessage item))
, "@timestamp" A..= A.String (T.pack (Time.formatTime Time.defaultTimeLocale "%Y-%m-%dT%H:%M:%S%03QZ" (K._itemTime item)))
, "ns" A..= K._itemNamespace item
, "loc" A..= (LocJs <$> K._itemLoc item)
]
where
POSIX.CPid pidInt = K._itemProcess item
renderLineTruncated :: K.LogItem a => K.Verbosity -> K.Item a -> (BB.Builder, Bytes)
renderLineTruncated = renderLineTruncated' maxLogLineLength
renderLineTruncated'
:: K.LogItem a
=> Bytes
-> K.Verbosity
-> K.Item a
-> (BB.Builder, Bytes)
renderLineTruncated' customMaxLogLineLength verbosity item =
if fullSize <= customMaxLogLineLength
then (fullLine, fullSize)
else (fallbackLine, fallbackSize)
where
fullObject = fullItemObject verbosity item
(fullLine, fullSize) = measureJSONLine fullObject
blankObject :: A.Object
blankObject = HM.fromList
[ "message" A..= A.String ""
, "@timestamp" A..= K._itemTime item
]
(_, blankObjectSize) = measureJSONLine blankObject
messageBytesAllowed = maxLogLineLength - blankObjectSize
(fallbackLine, fallbackSize) = measureJSONLine fallbackObject
fallbackObject :: A.Object
fallbackObject = HM.fromList
[ "message" A..= A.toJSON (TL.take (bytes messageBytesAllowed) (TB.toLazyText (K.unLogStr (K._itemMessage item))))
, "@timestamp" A..= A.toJSON (K._itemTime item)
]
data BulkBuffer = BulkBuffer
{ bulkBuffer_bytesUsed :: !Bytes
, bulkBuffer_payload :: !BB.Builder
, bulkBuffer_itemCount :: !Int
}
instance Semigroup.Semigroup BulkBuffer where
(BulkBuffer bytesUsedA bufferA itemCountA) <>
(BulkBuffer bytesUsedB bufferB itemCountB) = BulkBuffer
(bytesUsedA + bytesUsedB)
(bufferA <> bufferB)
(itemCountA + itemCountB)
instance Monoid BulkBuffer where
mempty = BulkBuffer 0 mempty 0
mappend = (<>)
data LogAction buf =
Buffered
buf
| FlushNow
buf
buf
bufferItem
:: K.LogItem a
=> Int
-> K.Verbosity
-> K.Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem = bufferItem' maxPayloadBytes maxLogLineLength
bufferItem'
:: (K.LogItem a)
=> Bytes
-> Bytes
-> Int
-> K.Verbosity
-> K.Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem' customMaxPayload customMaxItem maxItems verb item bulkBuffer =
let (encodedLine, spaceNeeded) = renderLineTruncated' customMaxItem verb item
newBytesUsed = bulkBuffer_bytesUsed bulkBuffer + spaceNeeded
newItemCount = bulkBuffer_itemCount bulkBuffer + 1
in if newItemCount >= maxItems || newBytesUsed >= customMaxPayload
then FlushNow
bulkBuffer
BulkBuffer
{ bulkBuffer_bytesUsed = spaceNeeded
, bulkBuffer_payload = encodedLine
, bulkBuffer_itemCount = 1
}
else Buffered $
BulkBuffer
{ bulkBuffer_bytesUsed = newBytesUsed
, bulkBuffer_payload = bulkBuffer_payload bulkBuffer <> encodedLine
, bulkBuffer_itemCount = newItemCount
}
forceFlush :: (Monoid buf) => buf -> LogAction buf
forceFlush buf = FlushNow buf mempty
annotateValue :: A.Value -> A.Value
annotateValue (A.Object o) = A.Object (annotateKeys o)
annotateValue (A.Array a) = A.Array (annotateValue <$> a)
annotateValue x = x
annotateKeys :: A.Object -> A.Object
annotateKeys = HM.fromList . map go . HM.toList
where
go (k, A.Object o) = (k, A.Object (annotateKeys o))
go (k, A.Array a) = (k, A.Array (annotateValue <$> a))
go (k, s@(A.String _)) = (k <> stringAnn, s)
go (k, n@(A.Number sci)) = if Scientific.isFloating sci
then (k <> doubleAnn, n)
else (k <> longAnn, n)
go (k, b@(A.Bool _)) = (k <> booleanAnn, b)
go (k, A.Null) = (k <> nullAnn, A.Null)
stringAnn :: T.Text
stringAnn = "::s"
doubleAnn :: T.Text
doubleAnn = "::d"
longAnn :: T.Text
longAnn = "::l"
booleanAnn :: T.Text
booleanAnn = "::b"
nullAnn :: T.Text
nullAnn = "::n"