{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeApplications #-} module OpenTelemetry.ZipkinExporter where -- Zipkin V2 protocol spec: https://github.com/openzipkin/zipkin-api/blob/master/zipkin2-api.yaml import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.IO.Class import Data.Aeson import qualified Data.HashMap.Strict as HM import Data.Scientific import qualified Data.Text as T import Network.HTTP.Client import Network.HTTP.Client.TLS import Network.HTTP.Types import OpenTelemetry.Common import OpenTelemetry.Debug import OpenTelemetry.Exporter import OpenTelemetry.SpanContext import System.IO.Unsafe import Text.Printf data ZipkinSpan = ZipkinSpan { zsConfig :: ZipkinConfig, zsSpan :: Span } tagValue2text :: TagValue -> T.Text tagValue2text tv = case tv of (StringTagValue s) -> s (BoolTagValue b) -> if b then "true" else "false" (IntTagValue i) -> T.pack $ show i (DoubleTagValue d) -> T.pack $ show (fromFloatDigits d) instance ToJSON ZipkinSpan where -- FIXME(divanov): deduplicate toJSON (ZipkinSpan ZipkinConfig {..} s@(Span {..})) = let TId tid = spanTraceId s SId sid = spanId s ts = spanStartedAt `div` 1000 duration = (spanFinishedAt - spanStartedAt) `div` 1000 in object $ [ "name" .= spanOperation, "traceId" .= T.pack (printf "%016x" tid), "id" .= T.pack (printf "%016x" sid), "timestamp" .= ts, "duration" .= duration, "localEndpoint" .= object ["serviceName" .= zServiceName], "tags" .= object ( [k .= v | (k, v) <- zGlobalTags] <> [k .= tagValue2text v | (k, v) <- HM.toList spanTags] ), "annotations" .= [ object ["timestamp" .= (t `div` 1000), "value" .= v] | SpanEvent t _ v <- spanEvents ] ] <> (maybe [] (\(SId psid) -> ["parentId" .= psid]) spanParentId) toEncoding (ZipkinSpan ZipkinConfig {..} s@(Span {..})) = let TId tid = spanTraceId s SId sid = spanId s ts = spanStartedAt `div` 1000 duration = (spanFinishedAt - spanStartedAt) `div` 1000 in pairs ( "name" .= spanOperation <> "traceId" .= T.pack (printf "%016x" tid) <> "id" .= T.pack (printf "%016x" sid) <> "timestamp" .= ts <> "duration" .= duration <> "localEndpoint" .= object ["serviceName" .= zServiceName] <> "tags" .= object ( [k .= v | (k, v) <- zGlobalTags] <> [k .= tagValue2text v | (k, v) <- HM.toList spanTags] ) <> ( maybe mempty (\(SId psid) -> "parentId" .= T.pack (printf "%016x" psid)) spanParentId ) <> "annotations" .= [ object ["timestamp" .= (t `div` 1000), "value" .= v] | SpanEvent t _ v <- spanEvents ] ) data ZipkinConfig = ZipkinConfig { zEndpoint :: String, zServiceName :: T.Text, zGlobalTags :: [(T.Text, T.Text)], zGracefulShutdownTimeoutSeconds :: Word, zSpanQueueSize :: Word } localhostZipkinConfig :: T.Text -> ZipkinConfig localhostZipkinConfig service = ZipkinConfig { zEndpoint = "http://localhost:9411/api/v2/spans", zServiceName = service, zGlobalTags = mempty, zGracefulShutdownTimeoutSeconds = 5, zSpanQueueSize = 2048 } data ZipkinClient = ZipkinClient { zcConfig :: ZipkinConfig, zcSenderThread :: Async (), zcSenderQueue :: TBQueue Span, zcShutdownVar :: TVar Bool } createZipkinSpanExporter :: MonadIO m => ZipkinConfig -> m (Exporter Span) createZipkinSpanExporter cfg = liftIO do client <- mkClient cfg pure $! Exporter ( \sps -> do let q = zcSenderQueue client atomically $ do q_population <- fromIntegral <$> lengthTBQueue q let q_vacancy = fromIntegral (zSpanQueueSize (zcConfig client) - q_population) modifyTVar droppedSpanCountVar (\x -> x + length sps - q_vacancy) mapM_ (writeTBQueue q) (take q_vacancy sps) pure ExportSuccess ) ( do atomically $ writeTVar (zcShutdownVar client) True wait (zcSenderThread client) ) mkClient :: ZipkinConfig -> IO ZipkinClient mkClient cfg@(ZipkinConfig {..}) = do manager <- newManager tlsManagerSettings q <- newTBQueueIO (fromIntegral zSpanQueueSize) shutdown_var <- newTVarIO False sender <- async $ do let loop = do (must_shutdown, sps) <- atomically $ do must_shutdown <- readTVar shutdown_var sps <- flushTBQueue q case (must_shutdown, sps) of (False, []) -> retry _ -> pure (must_shutdown, sps) case sps of [] -> pure () _ -> reportSpans zEndpoint manager cfg sps dd_ "must_shutdown" must_shutdown case must_shutdown of True -> pure () False -> loop loop pure $! ZipkinClient cfg sender q shutdown_var reportSpans :: String -> Manager -> ZipkinConfig -> [Span] -> IO () reportSpans endpoint httpManager cfg sps = do dd_ "reportSpans" sps let body = encode (map (ZipkinSpan cfg) sps) request = (parseRequest_ endpoint) { method = "POST", requestBody = RequestBodyLBS body, requestHeaders = [("Content-Type", "application/json")] } resp <- httpLbs request httpManager case statusCode (responseStatus resp) of s | s `elem` [200, 202] -> do inc 1 reportedSpanCountVar pure () _ -> do -- TODO(divanov): handle failures inc 1 rejectedSpanCountVar dd_ "body" body dd_ "resp status" $ responseStatus resp dd_ "resp" $ responseBody resp droppedSpanCountVar :: TVar Int droppedSpanCountVar = unsafePerformIO $ newTVarIO 0 {-# NOINLINE droppedSpanCountVar #-} reportedSpanCountVar :: TVar Int reportedSpanCountVar = unsafePerformIO $ newTVarIO 0 {-# NOINLINE reportedSpanCountVar #-} rejectedSpanCountVar :: TVar Int rejectedSpanCountVar = unsafePerformIO $ newTVarIO 0 {-# NOINLINE rejectedSpanCountVar #-}