{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
module OpenTelemetry.ZipkinExporter where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Data.Aeson
import qualified Data.HashMap.Strict as HM
import Data.Scientific
import qualified Data.Text as T
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import Network.HTTP.Types
import OpenTelemetry.Common
import OpenTelemetry.Debug
import OpenTelemetry.Exporter
import OpenTelemetry.SpanContext
import System.IO.Unsafe
import Text.Printf
data ZipkinSpan = ZipkinSpan
{ zsConfig :: ZipkinConfig,
zsSpan :: Span
}
tagValue2text :: TagValue -> T.Text
tagValue2text tv = case tv of
(StringTagValue s) -> s
(BoolTagValue b) -> if b then "true" else "false"
(IntTagValue i) -> T.pack $ show i
(DoubleTagValue d) -> T.pack $ show (fromFloatDigits d)
instance ToJSON ZipkinSpan where
toJSON (ZipkinSpan ZipkinConfig {..} s@(Span {..})) =
let TId tid = spanTraceId s
SId sid = spanId s
ts = spanStartedAt `div` 1000
duration = (spanFinishedAt - spanStartedAt) `div` 1000
in object $
[ "name" .= spanOperation,
"traceId" .= T.pack (printf "%016x" tid),
"id" .= T.pack (printf "%016x" sid),
"timestamp" .= ts,
"duration" .= duration,
"localEndpoint" .= object ["serviceName" .= zServiceName],
"tags"
.= object
( [k .= v | (k, v) <- zGlobalTags]
<> [k .= tagValue2text v | (k, v) <- HM.toList spanTags]
),
"annotations"
.= [ object ["timestamp" .= (t `div` 1000), "value" .= v]
| SpanEvent t _ v <- spanEvents
]
]
<> (maybe [] (\(SId psid) -> ["parentId" .= psid]) spanParentId)
toEncoding (ZipkinSpan ZipkinConfig {..} s@(Span {..})) =
let TId tid = spanTraceId s
SId sid = spanId s
ts = spanStartedAt `div` 1000
duration = (spanFinishedAt - spanStartedAt) `div` 1000
in pairs
( "name" .= spanOperation
<> "traceId" .= T.pack (printf "%016x" tid)
<> "id" .= T.pack (printf "%016x" sid)
<> "timestamp" .= ts
<> "duration" .= duration
<> "localEndpoint" .= object ["serviceName" .= zServiceName]
<> "tags"
.= object
( [k .= v | (k, v) <- zGlobalTags]
<> [k .= tagValue2text v | (k, v) <- HM.toList spanTags]
)
<> ( maybe
mempty
(\(SId psid) -> "parentId" .= T.pack (printf "%016x" psid))
spanParentId
)
<> "annotations"
.= [ object ["timestamp" .= (t `div` 1000), "value" .= v]
| SpanEvent t _ v <- spanEvents
]
)
data ZipkinConfig = ZipkinConfig
{ zEndpoint :: String,
zServiceName :: T.Text,
zGlobalTags :: [(T.Text, T.Text)],
zGracefulShutdownTimeoutSeconds :: Word,
zSpanQueueSize :: Word
}
localhostZipkinConfig :: T.Text -> ZipkinConfig
localhostZipkinConfig service =
ZipkinConfig
{ zEndpoint = "http://localhost:9411/api/v2/spans",
zServiceName = service,
zGlobalTags = mempty,
zGracefulShutdownTimeoutSeconds = 5,
zSpanQueueSize = 2048
}
data ZipkinClient = ZipkinClient
{ zcConfig :: ZipkinConfig,
zcSenderThread :: Async (),
zcSenderQueue :: TBQueue Span,
zcShutdownVar :: TVar Bool
}
createZipkinSpanExporter :: MonadIO m => ZipkinConfig -> m (Exporter Span)
createZipkinSpanExporter cfg = liftIO do
client <- mkClient cfg
pure
$! Exporter
( \sps -> do
let q = zcSenderQueue client
atomically $ do
q_population <- fromIntegral <$> lengthTBQueue q
let q_vacancy = fromIntegral (zSpanQueueSize (zcConfig client) - q_population)
modifyTVar droppedSpanCountVar (\x -> x + length sps - q_vacancy)
mapM_
(writeTBQueue q)
(take q_vacancy sps)
pure ExportSuccess
)
( do
atomically $
writeTVar (zcShutdownVar client) True
wait (zcSenderThread client)
)
mkClient :: ZipkinConfig -> IO ZipkinClient
mkClient cfg@(ZipkinConfig {..}) = do
manager <- newManager tlsManagerSettings
q <- newTBQueueIO (fromIntegral zSpanQueueSize)
shutdown_var <- newTVarIO False
sender <- async $ do
let loop = do
(must_shutdown, sps) <- atomically $ do
must_shutdown <- readTVar shutdown_var
sps <- flushTBQueue q
case (must_shutdown, sps) of
(False, []) -> retry
_ -> pure (must_shutdown, sps)
case sps of
[] -> pure ()
_ -> reportSpans zEndpoint manager cfg sps
dd_ "must_shutdown" must_shutdown
case must_shutdown of
True -> pure ()
False -> loop
loop
pure $! ZipkinClient cfg sender q shutdown_var
reportSpans :: String -> Manager -> ZipkinConfig -> [Span] -> IO ()
reportSpans endpoint httpManager cfg sps = do
dd_ "reportSpans" sps
let body = encode (map (ZipkinSpan cfg) sps)
request =
(parseRequest_ endpoint)
{ method = "POST",
requestBody = RequestBodyLBS body,
requestHeaders = [("Content-Type", "application/json")]
}
resp <- httpLbs request httpManager
case statusCode (responseStatus resp) of
s | s `elem` [200, 202] -> do
inc 1 reportedSpanCountVar
pure ()
_ -> do
inc 1 rejectedSpanCountVar
dd_ "body" body
dd_ "resp status" $ responseStatus resp
dd_ "resp" $ responseBody resp
droppedSpanCountVar :: TVar Int
droppedSpanCountVar = unsafePerformIO $ newTVarIO 0
{-# NOINLINE droppedSpanCountVar #-}
reportedSpanCountVar :: TVar Int
reportedSpanCountVar = unsafePerformIO $ newTVarIO 0
{-# NOINLINE reportedSpanCountVar #-}
rejectedSpanCountVar :: TVar Int
rejectedSpanCountVar = unsafePerformIO $ newTVarIO 0
{-# NOINLINE rejectedSpanCountVar #-}