{-# LANGUAGE RecordWildCards #-}
module OpenTelemetry.Processor.Batch (
BatchTimeoutConfig (..),
batchTimeoutConfig,
batchProcessor,
) where
import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import Data.Vector (Vector)
import OpenTelemetry.Exporter (Exporter)
import qualified OpenTelemetry.Exporter as Exporter
import OpenTelemetry.Processor
import OpenTelemetry.Trace.Core
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder
data BatchTimeoutConfig = BatchTimeoutConfig
{ BatchTimeoutConfig -> Int
maxQueueSize :: Int
, BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
, BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
, BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
}
deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchTimeoutConfig] -> ShowS
$cshowList :: [BatchTimeoutConfig] -> ShowS
show :: BatchTimeoutConfig -> String
$cshow :: BatchTimeoutConfig -> String
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
Show)
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig =
BatchTimeoutConfig
{ maxQueueSize :: Int
maxQueueSize = Int
1024
, scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
, exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
, maxExportBatchSize :: Int
maxExportBatchSize = Int
512
}
data BoundedMap a = BoundedMap
{ forall a. BoundedMap a -> Int
itemBounds :: !Int
, forall a. BoundedMap a -> Int
itemMaxExportBounds :: !Int
, forall a. BoundedMap a -> Int
itemCount :: !Int
, forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder a)
}
boundedMap :: Int -> Int -> BoundedMap a
boundedMap :: forall a. Int -> Int -> BoundedMap a
boundedMap Int
bounds Int
exportBounds = forall a.
Int
-> Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
BoundedMap Int
bounds Int
exportBounds Int
0 forall a. Monoid a => a
mempty
push :: ImmutableSpan -> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push :: ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
s BoundedMap ImmutableSpan
m =
if forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m forall a. Num a => a -> a -> a
+ Int
1 forall a. Ord a => a -> a -> Bool
>= forall a. BoundedMap a -> Int
itemBounds BoundedMap ImmutableSpan
m
then forall a. Maybe a
Nothing
else
forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$!
BoundedMap ImmutableSpan
m
{ itemCount :: Int
itemCount = forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m forall a. Num a => a -> a -> a
+ Int
1
, itemMap :: HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap =
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith
forall a. Semigroup a => a -> a -> a
(<>)
(Tracer -> InstrumentationLibrary
tracerName forall a b. (a -> b) -> a -> b
$ ImmutableSpan -> Tracer
spanTracer ImmutableSpan
s)
(forall element. element -> Builder element
Builder.singleton ImmutableSpan
s)
forall a b. (a -> b) -> a -> b
$ forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap ImmutableSpan
m
}
buildExport :: BoundedMap a -> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport :: forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport BoundedMap a
m =
( BoundedMap a
m {itemCount :: Int
itemCount = Int
0, itemMap :: HashMap InstrumentationLibrary (Builder a)
itemMap = forall a. Monoid a => a
mempty}
, forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap a
m
)
data ProcessorMessage = ScheduledFlush | MaxExportFlush | Shutdown
batchProcessor :: (MonadIO m) => BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor :: forall (m :: * -> *).
MonadIO m =>
BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor BatchTimeoutConfig {Int
maxExportBatchSize :: Int
exportTimeoutMillis :: Int
scheduledDelayMillis :: Int
maxQueueSize :: Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
maxQueueSize :: BatchTimeoutConfig -> Int
..} Exporter ImmutableSpan
exporter = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
rtsSupportsBoundThreads forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => String -> a
error String
"The hs-opentelemetry batch processor does not work without the -threaded GHC flag!"
IORef (BoundedMap ImmutableSpan)
batch <- forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$ forall a. Int -> Int -> BoundedMap a
boundedMap Int
maxQueueSize Int
maxExportBatchSize
TMVar ()
workSignal <- forall a. IO (TMVar a)
newEmptyTMVarIO
TMVar ()
shutdownSignal <- forall a. IO (TMVar a)
newEmptyTMVarIO
let publish :: HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
forall a.
Exporter a
-> HashMap InstrumentationLibrary (Vector a) -> IO ExportResult
Exporter.exporterExport Exporter ImmutableSpan
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
let flushQueueImmediately :: ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret = do
HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
if forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
then do
forall (f :: * -> *) a. Applicative f => a -> f a
pure ExportResult
ret
else do
ExportResult
ret' <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret'
let waiting :: IO ProcessorMessage
waiting = do
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
scheduledDelayMillis)
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
[ ProcessorMessage
ScheduledFlush forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
Bool
continue <- forall a. TVar a -> STM a
readTVar TVar Bool
delay
Bool -> STM ()
check Bool
continue
, ProcessorMessage
MaxExportFlush forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TMVar a -> STM a
takeTMVar TMVar ()
workSignal
, ProcessorMessage
Shutdown forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TMVar a -> STM a
takeTMVar TMVar ()
shutdownSignal
]
let workerAction :: IO ExportResult
workerAction = do
ProcessorMessage
req <- IO ProcessorMessage
waiting
HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
ExportResult
res <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
case ProcessorMessage
req of
ProcessorMessage
Shutdown ->
ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
res
ProcessorMessage
_ ->
IO ExportResult
workerAction
Async ExportResult
worker <- forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> forall a. IO a -> IO a
unmask IO ExportResult
workerAction
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
Processor
{ processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
processorOnStart = \IORef ImmutableSpan
_ Context
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, processorOnEnd :: IORef ImmutableSpan -> IO ()
processorOnEnd = \IORef ImmutableSpan
s -> do
ImmutableSpan
span_ <- forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
Bool
appendFailedOrExportNeeded <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a b. (a -> b) -> a -> b
$ \BoundedMap ImmutableSpan
builder ->
case ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
span_ BoundedMap ImmutableSpan
builder of
Maybe (BoundedMap ImmutableSpan)
Nothing -> (BoundedMap ImmutableSpan
builder, Bool
True)
Just BoundedMap ImmutableSpan
b' ->
if forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
b' forall a. Ord a => a -> a -> Bool
>= forall a. BoundedMap a -> Int
itemMaxExportBounds BoundedMap ImmutableSpan
b'
then
(BoundedMap ImmutableSpan
b', Bool
True)
else (BoundedMap ImmutableSpan
b', Bool
False)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailedOrExportNeeded forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
, processorForceFlush :: IO ()
processorForceFlush = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
,
processorShutdown :: IO (Async ShutdownResult)
processorShutdown =
forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ do
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
shutdownSignal ()
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
exportTimeoutMillis)
Maybe (Either SomeException ExportResult)
shutdownResult <-
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
[ forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ExportResult
worker
, forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
Bool
shouldStop <- forall a. TVar a -> STM a
readTVar TVar Bool
delay
Bool -> STM ()
check Bool
shouldStop
]
forall a. Async a -> IO ()
cancel Async ExportResult
worker
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ case Maybe (Either SomeException ExportResult)
shutdownResult of
Maybe (Either SomeException ExportResult)
Nothing ->
ShutdownResult
ShutdownTimeout
Just Either SomeException ExportResult
er ->
case Either SomeException ExportResult
er of
Left SomeException
_ ->
ShutdownResult
ShutdownFailure
Right ExportResult
_ ->
ShutdownResult
ShutdownSuccess
}
where
millisToMicros :: Int -> Int
millisToMicros = (forall a. Num a => a -> a -> a
* Int
1000)