{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}

module OpenTelemetry.ZipkinExporter where

-- Zipkin V2 protocol spec: https://github.com/openzipkin/zipkin-api/blob/master/zipkin2-api.yaml

import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Data.Coerce
import qualified Data.HashMap.Strict as HM
import Data.Scientific
import qualified Data.Text as T
import qualified Jsonifier as J
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import Network.HTTP.Types
import OpenTelemetry.Common
import OpenTelemetry.Debug
import OpenTelemetry.SpanContext
import System.IO.Unsafe
import Text.Printf

data ZipkinSpan = ZipkinSpan
  { ZipkinSpan -> ZipkinConfig
zsConfig :: ZipkinConfig,
    ZipkinSpan -> Span
zsSpan :: Span
  }

tagValue2text :: TagValue -> J.Json
tagValue2text :: TagValue -> Json
tagValue2text TagValue
tv = Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ case TagValue
tv of
  (StringTagValue (TagVal Text
s)) -> Text
s
  (BoolTagValue Bool
b) -> if Bool
b then Text
"true" else Text
"false"
  (IntTagValue Int
i) -> String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
i
  (DoubleTagValue Double
d) -> String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Scientific -> String
forall a. Show a => a -> String
show (Double -> Scientific
forall a. RealFloat a => a -> Scientific
fromFloatDigits Double
d)

jSpan :: ZipkinConfig -> Span -> J.Json
jSpan :: ZipkinConfig -> Span -> Json
jSpan ZipkinConfig {String
[(Text, Text)]
Word
Text
zSpanQueueSize :: ZipkinConfig -> Word
zGracefulShutdownTimeoutSeconds :: ZipkinConfig -> Word
zGlobalTags :: ZipkinConfig -> [(Text, Text)]
zServiceName :: ZipkinConfig -> Text
zEndpoint :: ZipkinConfig -> String
zSpanQueueSize :: Word
zGracefulShutdownTimeoutSeconds :: Word
zGlobalTags :: [(Text, Text)]
zServiceName :: Text
zEndpoint :: String
..} s :: Span
s@(Span {[SpanEvent]
Maybe SpanId
Word32
Word64
Text
SpanContext
HashMap TagName TagValue
SpanStatus
$sel:spanNanosecondsSpentInGC:Span :: Span -> Word64
$sel:spanParentId:Span :: Span -> Maybe SpanId
$sel:spanStatus:Span :: Span -> SpanStatus
$sel:spanEvents:Span :: Span -> [SpanEvent]
$sel:spanTags:Span :: Span -> HashMap TagName TagValue
$sel:spanFinishedAt:Span :: Span -> Word64
$sel:spanStartedAt:Span :: Span -> Word64
$sel:spanDisplayThreadId:Span :: Span -> Word32
$sel:spanThreadId:Span :: Span -> Word32
$sel:spanOperation:Span :: Span -> Text
$sel:spanContext:Span :: Span -> SpanContext
spanNanosecondsSpentInGC :: Word64
spanParentId :: Maybe SpanId
spanStatus :: SpanStatus
spanEvents :: [SpanEvent]
spanTags :: HashMap TagName TagValue
spanFinishedAt :: Word64
spanStartedAt :: Word64
spanDisplayThreadId :: Word32
spanThreadId :: Word32
spanOperation :: Text
spanContext :: SpanContext
..}) =
  let TId Word64
tid = Span -> TraceId
spanTraceId Span
s
      SId Word64
sid = Span -> SpanId
spanId Span
s
      ts :: Word64
ts = Word64
spanStartedAt Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1000
      duration :: Word64
duration = (Word64
spanFinishedAt Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64
spanStartedAt) Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1000
   in [(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object ([(Text, Json)] -> Json) -> [(Text, Json)] -> Json
forall a b. (a -> b) -> a -> b
$
        [ (Text
"name", Text -> Json
J.textString Text
spanOperation),
          (Text
"traceId", Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Word64 -> String
forall r. PrintfType r => String -> r
printf String
"%016x" Word64
tid)),
          (Text
"id", Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Word64 -> String
forall r. PrintfType r => String -> r
printf String
"%016x" Word64
sid)),
          (Text
"timestamp", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
ts),
          (Text
"duration", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
duration),
          (Text
"localEndpoint", [(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object [(Text
"serviceName", Text -> Json
J.textString Text
zServiceName)]),
          ( Text
"tags",
            [(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object
              ( ((Text -> Json) -> (Text, Text) -> (Text, Json)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> Json
J.textString ((Text, Text) -> (Text, Json)) -> [(Text, Text)] -> [(Text, Json)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(Text, Text)]
zGlobalTags)
                  [(Text, Json)] -> [(Text, Json)] -> [(Text, Json)]
forall a. Semigroup a => a -> a -> a
<> [(Text
k, TagValue -> Json
tagValue2text TagValue
v) | ((TagName Text
k), TagValue
v) <- HashMap TagName TagValue -> [(TagName, TagValue)]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap TagName TagValue
spanTags]
              )
          ),
          ( Text
"annotations",
            [Json] -> Json
forall (f :: * -> *). Foldable f => f Json -> Json
J.array
              [ [(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object
                  [ (Text
"timestamp", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64
t Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1000)),
                    (Text
"value", Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ EventVal -> Text
coerce EventVal
v)
                  ]
                | SpanEvent Word64
t EventName
_ EventVal
v <- [SpanEvent]
spanEvents
              ]
          )
        ]
          [(Text, Json)] -> [(Text, Json)] -> [(Text, Json)]
forall a. Semigroup a => a -> a -> a
<> ([(Text, Json)]
-> (SpanId -> [(Text, Json)]) -> Maybe SpanId -> [(Text, Json)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\(SId Word64
psid) -> [(Text
"parentId", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
psid)]) Maybe SpanId
spanParentId)

data ZipkinConfig = ZipkinConfig
  { ZipkinConfig -> String
zEndpoint :: String,
    ZipkinConfig -> Text
zServiceName :: T.Text,
    ZipkinConfig -> [(Text, Text)]
zGlobalTags :: [(T.Text, T.Text)],
    ZipkinConfig -> Word
zGracefulShutdownTimeoutSeconds :: Word,
    ZipkinConfig -> Word
zSpanQueueSize :: Word
  }

localhostZipkinConfig :: T.Text -> ZipkinConfig
localhostZipkinConfig :: Text -> ZipkinConfig
localhostZipkinConfig Text
service =
  ZipkinConfig :: String -> Text -> [(Text, Text)] -> Word -> Word -> ZipkinConfig
ZipkinConfig
    { zEndpoint :: String
zEndpoint = String
"http://localhost:9411/api/v2/spans",
      zServiceName :: Text
zServiceName = Text
service,
      zGlobalTags :: [(Text, Text)]
zGlobalTags = [(Text, Text)]
forall a. Monoid a => a
mempty,
      zGracefulShutdownTimeoutSeconds :: Word
zGracefulShutdownTimeoutSeconds = Word
5,
      zSpanQueueSize :: Word
zSpanQueueSize = Word
2048
    }

data ZipkinClient = ZipkinClient
  { ZipkinClient -> ZipkinConfig
zcConfig :: ZipkinConfig,
    ZipkinClient -> Async ()
zcSenderThread :: Async (),
    ZipkinClient -> TBQueue Span
zcSenderQueue :: TBQueue Span,
    ZipkinClient -> TVar Bool
zcShutdownVar :: TVar Bool
  }

createZipkinSpanExporter :: MonadIO m => ZipkinConfig -> m (Exporter Span)
createZipkinSpanExporter :: ZipkinConfig -> m (Exporter Span)
createZipkinSpanExporter ZipkinConfig
cfg = IO (Exporter Span) -> m (Exporter Span)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
  ZipkinClient
client <- ZipkinConfig -> IO ZipkinClient
mkClient ZipkinConfig
cfg
  Exporter Span -> IO (Exporter Span)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    (Exporter Span -> IO (Exporter Span))
-> Exporter Span -> IO (Exporter Span)
forall a b. (a -> b) -> a -> b
$! ([Span] -> IO ExportResult) -> IO () -> Exporter Span
forall thing.
([thing] -> IO ExportResult) -> IO () -> Exporter thing
Exporter
      ( \[Span]
sps -> do
          let q :: TBQueue Span
q = ZipkinClient -> TBQueue Span
zcSenderQueue ZipkinClient
client
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Word
q_population <- Natural -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Word) -> STM Natural -> STM Word
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue Span -> STM Natural
forall a. TBQueue a -> STM Natural
lengthTBQueue TBQueue Span
q
            let q_vacancy :: Int
q_vacancy = Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ZipkinConfig -> Word
zSpanQueueSize (ZipkinClient -> ZipkinConfig
zcConfig ZipkinClient
client) Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word
q_population)
            TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Int
droppedSpanCountVar (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [Span] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Span]
sps Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
q_vacancy)
            (Span -> STM ()) -> [Span] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
              (TBQueue Span -> Span -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Span
q)
              (Int -> [Span] -> [Span]
forall a. Int -> [a] -> [a]
take Int
q_vacancy [Span]
sps)
          ExportResult -> IO ExportResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ExportResult
ExportSuccess
      )
      ( do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
            TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (ZipkinClient -> TVar Bool
zcShutdownVar ZipkinClient
client) Bool
True
          Async () -> IO ()
forall a. Async a -> IO a
wait (ZipkinClient -> Async ()
zcSenderThread ZipkinClient
client)
      )

mkClient :: ZipkinConfig -> IO ZipkinClient
mkClient :: ZipkinConfig -> IO ZipkinClient
mkClient cfg :: ZipkinConfig
cfg@(ZipkinConfig {String
[(Text, Text)]
Word
Text
zSpanQueueSize :: Word
zGracefulShutdownTimeoutSeconds :: Word
zGlobalTags :: [(Text, Text)]
zServiceName :: Text
zEndpoint :: String
zSpanQueueSize :: ZipkinConfig -> Word
zGracefulShutdownTimeoutSeconds :: ZipkinConfig -> Word
zGlobalTags :: ZipkinConfig -> [(Text, Text)]
zServiceName :: ZipkinConfig -> Text
zEndpoint :: ZipkinConfig -> String
..}) = do
  Manager
manager <- ManagerSettings -> IO Manager
newManager ManagerSettings
tlsManagerSettings
  TBQueue Span
q <- Natural -> IO (TBQueue Span)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO (Word -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
zSpanQueueSize)
  TVar Bool
shutdown_var <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
  Async ()
sender <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
    let loop :: IO ()
loop = do
          (Bool
must_shutdown, [Span]
sps) <- STM (Bool, [Span]) -> IO (Bool, [Span])
forall a. STM a -> IO a
atomically (STM (Bool, [Span]) -> IO (Bool, [Span]))
-> STM (Bool, [Span]) -> IO (Bool, [Span])
forall a b. (a -> b) -> a -> b
$ do
            Bool
must_shutdown <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
shutdown_var
            [Span]
sps <- TBQueue Span -> STM [Span]
forall a. TBQueue a -> STM [a]
flushTBQueue TBQueue Span
q
            case (Bool
must_shutdown, [Span]
sps) of
              (Bool
False, []) -> STM (Bool, [Span])
forall a. STM a
retry
              (Bool, [Span])
_ -> (Bool, [Span]) -> STM (Bool, [Span])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
must_shutdown, [Span]
sps)
          case [Span]
sps of
            [] -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            [Span]
_ -> String -> Manager -> ZipkinConfig -> [Span] -> IO ()
reportSpans String
zEndpoint Manager
manager ZipkinConfig
cfg [Span]
sps
          String -> Bool -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"must_shutdown" Bool
must_shutdown
          case Bool
must_shutdown of
            Bool
True -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Bool
False -> IO ()
loop
    IO ()
loop
  ZipkinClient -> IO ZipkinClient
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ZipkinClient -> IO ZipkinClient)
-> ZipkinClient -> IO ZipkinClient
forall a b. (a -> b) -> a -> b
$! ZipkinConfig
-> Async () -> TBQueue Span -> TVar Bool -> ZipkinClient
ZipkinClient ZipkinConfig
cfg Async ()
sender TBQueue Span
q TVar Bool
shutdown_var

reportSpans :: String -> Manager -> ZipkinConfig -> [Span] -> IO ()
reportSpans :: String -> Manager -> ZipkinConfig -> [Span] -> IO ()
reportSpans String
endpoint Manager
httpManager ZipkinConfig
cfg [Span]
sps = do
  String -> [Span] -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"reportSpans" [Span]
sps
  let body :: ByteString
body = Json -> ByteString
J.toByteString (Json -> ByteString) -> Json -> ByteString
forall a b. (a -> b) -> a -> b
$ [Json] -> Json
forall (f :: * -> *). Foldable f => f Json -> Json
J.array ((Span -> Json) -> [Span] -> [Json]
forall a b. (a -> b) -> [a] -> [b]
map (ZipkinConfig -> Span -> Json
jSpan ZipkinConfig
cfg) [Span]
sps)
      request :: Request
request =
        (String -> Request
parseRequest_ String
endpoint)
          { method :: ByteString
method = ByteString
"POST",
            requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyBS ByteString
body,
            requestHeaders :: RequestHeaders
requestHeaders = [(HeaderName
"Content-Type", ByteString
"application/json")]
          }
  Response ByteString
resp <- Request -> Manager -> IO (Response ByteString)
httpLbs Request
request Manager
httpManager
  case Status -> Int
statusCode (Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
resp) of
    Int
s | Int
s Int -> [Int] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int
200, Int
202] -> do
      Int -> TVar Int -> IO ()
inc Int
1 TVar Int
reportedSpanCountVar
      () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Int
_ -> do
      -- TODO(divanov): handle failures
      Int -> TVar Int -> IO ()
inc Int
1 TVar Int
rejectedSpanCountVar
      String -> ByteString -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"body" ByteString
body
      String -> Status -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"resp status" (Status -> IO ()) -> Status -> IO ()
forall a b. (a -> b) -> a -> b
$ Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
resp
      String -> ByteString -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"resp" (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Response ByteString -> ByteString
forall body. Response body -> body
responseBody Response ByteString
resp

droppedSpanCountVar :: TVar Int
droppedSpanCountVar :: TVar Int
droppedSpanCountVar = IO (TVar Int) -> TVar Int
forall a. IO a -> a
unsafePerformIO (IO (TVar Int) -> TVar Int) -> IO (TVar Int) -> TVar Int
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
{-# NOINLINE droppedSpanCountVar #-}

reportedSpanCountVar :: TVar Int
reportedSpanCountVar :: TVar Int
reportedSpanCountVar = IO (TVar Int) -> TVar Int
forall a. IO a -> a
unsafePerformIO (IO (TVar Int) -> TVar Int) -> IO (TVar Int) -> TVar Int
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
{-# NOINLINE reportedSpanCountVar #-}

rejectedSpanCountVar :: TVar Int
rejectedSpanCountVar :: TVar Int
rejectedSpanCountVar = IO (TVar Int) -> TVar Int
forall a. IO a -> a
unsafePerformIO (IO (TVar Int) -> TVar Int) -> IO (TVar Int) -> TVar Int
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
{-# NOINLINE rejectedSpanCountVar #-}