{-# LANGUAGE OverloadedStrings #-}
module OpenTelemetry.EventlogStreaming_Internal where
import Control.Concurrent (threadDelay)
import qualified Data.ByteString as B
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap as IM
import Data.List.NonEmpty as NE
import Data.Maybe
import qualified Data.Text as T
import Data.Word
import GHC.RTS.Events
import GHC.RTS.Events.Incremental
import GHC.Stack
import OpenTelemetry.Common hiding (Event, Timestamp)
import qualified OpenTelemetry.Common as OTel
import OpenTelemetry.Debug
import OpenTelemetry.Exporter
import OpenTelemetry.SpanContext
import System.IO
import qualified System.Random.SplitMix as R
import Text.Printf
work :: Timestamp -> Exporter Span -> Handle -> IO ()
work origin_timestamp exporter input = do
d_ "Starting the eventlog reader"
smgen <- R.initSMGen
go (initialState origin_timestamp smgen) decodeEventLog
d_ "no more work"
where
go s (Produce event next) = do
case evSpec event of
Shutdown {} -> do
d_ "Shutdown-like event detected"
CapDelete {} -> do
d_ "Shutdown-like event detected"
CapsetDelete {} -> do
d_ "Shutdown-like event detected"
_ -> do
let (s', sps) = processEvent event s
_ <- export exporter sps
mapM_ (d_ . ("emit " <>) . show) sps
go s' next
go s d@(Consume consume) = do
eof <- hIsEOF input
case eof of
False -> do
chunk <- B.hGetSome input 4096
if B.null chunk
then do
threadDelay 1000
go s d
else do
go s $ consume chunk
True -> do
d_ "EOF"
go _ (Done _) = do
d_ "go Done"
pure ()
go _ (Error _leftover err) = do
d_ "go Error"
d_ err
data State = S
{ originTimestamp :: Timestamp,
threadMap :: IM.IntMap ThreadId,
spanStacks :: HM.HashMap ThreadId (NonEmpty Span),
traceMap :: HM.HashMap ThreadId TraceId,
randomGen :: R.SMGen
}
deriving (Show)
initialState :: Word64 -> R.SMGen -> State
initialState timestamp = S timestamp mempty mempty mempty
processEvent :: Event -> State -> (State, [Span])
processEvent (Event ts ev m_cap) st@(S {..}) =
let now = originTimestamp + ts
m_thread_id = m_cap >>= flip IM.lookup threadMap
m_trace_id = m_thread_id >>= flip HM.lookup traceMap
in case (ev, m_cap, m_thread_id) of
(WallClockTime {sec, nsec}, _, _) -> (st {originTimestamp = sec * 1_000_000_000 + fromIntegral nsec - ts}, [])
(CreateThread new_tid, _, _) ->
case m_trace_id of
Just trace_id -> (st {traceMap = HM.insert new_tid trace_id traceMap}, [])
_ -> (st, [])
(RunThread tid, Just cap, _) ->
(st {threadMap = IM.insert cap tid threadMap}, [])
(StopThread _ tstatus, Just cap, _)
| isTerminalThreadStatus tstatus -> (st {threadMap = IM.delete cap threadMap}, [])
(StartGC, _, _) -> (pushGCSpans st now, [])
(GCStatsGHC {gen}, _, _) -> (modifyAllSpans (setTag "gen" gen) st, [])
(EndGC, _, _) -> popSpansAcrossAllThreads now st
(HeapAllocated {allocBytes}, _, Just tid) ->
(modifySpan tid (addEvent now "heap_alloc_bytes" (showT allocBytes)) st, [])
(UserMessage {msg}, _, fromMaybe 1 -> tid) -> case T.words msg of
("ot1" : "begin" : "span" : name) ->
(pushSpan tid (T.intercalate " " name) now st, [])
("ot1" : "end" : "span" : _) -> popSpan tid now st
("ot1" : "set" : "tag" : k : v) -> (modifySpan tid (setTag k (T.unwords v)) st, [])
["ot1", "set", "traceid", trace_id_text] ->
let trace_id = TId (read ("0x" <> T.unpack trace_id_text))
in ( (modifySpan tid (setTraceId trace_id) st)
{ traceMap = HM.insert tid trace_id traceMap
},
[]
)
["ot1", "set", "spanid", span_id] ->
(modifySpan tid (setSpanId (SId (read ("0x" <> T.unpack span_id)))) st, [])
["ot1", "set", "parent", trace_id_text, span_id_text] ->
let trace_id = TId (read ("0x" <> T.unpack trace_id_text))
sid = SId (read ("0x" <> T.unpack span_id_text))
in ( (modifySpan tid (setParent trace_id sid) st)
{ traceMap = HM.insert tid trace_id traceMap
},
[]
)
("ot1" : "add" : "event" : k : v) -> (modifySpan tid (addEvent now k (T.unwords v)) st, [])
("ot1" : rest) -> error $ printf "Unrecognized %s" (show rest)
_ -> (st, [])
_ -> (st, [])
setTag :: ToTagValue v => T.Text -> v -> Span -> Span
setTag k v sp =
sp
{ spanTags = HM.insert k (toTagValue v) (spanTags sp)
}
setSpanId :: SpanId -> Span -> Span
setSpanId sid sp =
sp
{ spanContext = SpanContext sid (spanTraceId sp)
}
setTraceId :: TraceId -> Span -> Span
setTraceId tid sp =
sp
{ spanContext = SpanContext (spanId sp) tid
}
setParent :: TraceId -> SpanId -> Span -> Span
setParent ptid psid sp =
sp
{ spanParentId = Just psid,
spanContext = SpanContext (spanId sp) ptid
}
addEvent :: Timestamp -> T.Text -> T.Text -> Span -> Span
addEvent ts k v sp = sp {spanEvents = new_events}
where
new_events = ev : spanEvents sp
ev = SpanEvent ts k v
modifyAllSpans :: (Span -> Span) -> State -> State
modifyAllSpans f st =
st
{ spanStacks =
fmap
(\(sp :| sps) -> (f sp :| sps))
(spanStacks st)
}
modifySpan :: HasCallStack => ThreadId -> (Span -> Span) -> State -> State
modifySpan tid f st =
st
{ spanStacks =
HM.update (\(sp :| sps) -> Just (f sp :| sps)) tid (spanStacks st)
}
pushSpan :: HasCallStack => ThreadId -> T.Text -> OTel.Timestamp -> State -> State
pushSpan tid name timestamp st = st {spanStacks = new_stacks, randomGen = new_randomGen, traceMap = new_traceMap}
where
maybe_parent = NE.head <$> HM.lookup tid (spanStacks st)
new_stacks = HM.alter f tid (spanStacks st)
f Nothing = Just $ sp :| []
f (Just sps) = Just $ cons sp sps
(sid, new_randomGen) = R.nextWord64 (randomGen st)
(new_traceMap, trace_id) = case (maybe_parent, HM.lookup tid (traceMap st)) of
(Just parent, _) -> (traceMap st, spanTraceId parent)
(_, Just trace_id') -> (traceMap st, trace_id')
_ -> let new_trace_id = TId sid in (HM.insert tid new_trace_id (traceMap st), new_trace_id)
sp =
Span
{ spanContext = SpanContext (SId sid) trace_id,
spanOperation = name,
spanStartedAt = timestamp,
spanFinishedAt = 0,
spanTags = HM.singleton "tid" (IntTagValue $ fromIntegral tid),
spanEvents = mempty,
spanStatus = OK,
spanParentId = spanId <$> maybe_parent
}
popSpan :: HasCallStack => ThreadId -> OTel.Timestamp -> State -> (State, [Span])
popSpan tid timestamp st = (st {spanStacks = new_stacks, traceMap = new_traceMap}, [sp {spanFinishedAt = timestamp}])
where
sp :| new_stack = fromMaybe (error $ printf "popSpan: missing span stack for thread %d" tid) $ HM.lookup tid (spanStacks st)
(new_traceMap, new_stacks) = case new_stack of
[] -> (HM.delete tid (traceMap st), HM.delete tid (spanStacks st))
x : xs -> (traceMap st, HM.insert tid (x :| xs) (spanStacks st))
pushGCSpans :: HasCallStack => State -> OTel.Timestamp -> State
pushGCSpans st timestamp = foldr go st tids
where
tids = HM.keys (spanStacks st)
go tid = pushSpan tid "gc" timestamp
popSpansAcrossAllThreads :: HasCallStack => OTel.Timestamp -> State -> (State, [Span])
popSpansAcrossAllThreads timestamp st = foldr go (st, []) tids
where
tids = HM.keys (spanStacks st)
go tid (st', sps) =
let (st'', sps') = popSpan tid timestamp st'
in (st'', sps' <> sps)
isTerminalThreadStatus :: ThreadStopStatus -> Bool
isTerminalThreadStatus HeapOverflow = True
isTerminalThreadStatus StackOverflow = True
isTerminalThreadStatus ThreadFinished = True
isTerminalThreadStatus _ = False
showT :: Show a => a -> T.Text
showT = T.pack . show