{-# LANGUAGE RecordWildCards #-}
module OpenTelemetry.Processor.Batch (
  BatchTimeoutConfig (..),
  batchTimeoutConfig,
  batchProcessor,
  
) where
import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import Data.Vector (Vector)
import OpenTelemetry.Exporter (Exporter)
import qualified OpenTelemetry.Exporter as Exporter
import OpenTelemetry.Processor
import OpenTelemetry.Trace.Core
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder
data BatchTimeoutConfig = BatchTimeoutConfig
  { BatchTimeoutConfig -> Int
maxQueueSize :: Int
  
  , BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
  
  
  , BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
  
  
  , BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
  
  
  }
  deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchTimeoutConfig] -> ShowS
$cshowList :: [BatchTimeoutConfig] -> ShowS
show :: BatchTimeoutConfig -> String
$cshow :: BatchTimeoutConfig -> String
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
Show)
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig =
  BatchTimeoutConfig
    { maxQueueSize :: Int
maxQueueSize = Int
1024
    , scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
    , exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
    , maxExportBatchSize :: Int
maxExportBatchSize = Int
512
    }
data BoundedMap a = BoundedMap
  { forall a. BoundedMap a -> Int
itemBounds :: !Int
  , forall a. BoundedMap a -> Int
itemMaxExportBounds :: !Int
  , forall a. BoundedMap a -> Int
itemCount :: !Int
  , forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder a)
  }
boundedMap :: Int -> Int -> BoundedMap a
boundedMap :: forall a. Int -> Int -> BoundedMap a
boundedMap Int
bounds Int
exportBounds = forall a.
Int
-> Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
BoundedMap Int
bounds Int
exportBounds Int
0 forall a. Monoid a => a
mempty
push :: ImmutableSpan -> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push :: ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
s BoundedMap ImmutableSpan
m =
  if forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m forall a. Num a => a -> a -> a
+ Int
1 forall a. Ord a => a -> a -> Bool
>= forall a. BoundedMap a -> Int
itemBounds BoundedMap ImmutableSpan
m
    then forall a. Maybe a
Nothing
    else
      forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$!
        BoundedMap ImmutableSpan
m
          { itemCount :: Int
itemCount = forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m forall a. Num a => a -> a -> a
+ Int
1
          , itemMap :: HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap =
              forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith
                forall a. Semigroup a => a -> a -> a
(<>)
                (Tracer -> InstrumentationLibrary
tracerName forall a b. (a -> b) -> a -> b
$ ImmutableSpan -> Tracer
spanTracer ImmutableSpan
s)
                (forall element. element -> Builder element
Builder.singleton ImmutableSpan
s)
                forall a b. (a -> b) -> a -> b
$ forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap ImmutableSpan
m
          }
buildExport :: BoundedMap a -> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport :: forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport BoundedMap a
m =
  ( BoundedMap a
m {itemCount :: Int
itemCount = Int
0, itemMap :: HashMap InstrumentationLibrary (Builder a)
itemMap = forall a. Monoid a => a
mempty}
  , forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap a
m
  )
data ProcessorMessage = ScheduledFlush | MaxExportFlush | Shutdown
batchProcessor :: (MonadIO m) => BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor :: forall (m :: * -> *).
MonadIO m =>
BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor BatchTimeoutConfig {Int
maxExportBatchSize :: Int
exportTimeoutMillis :: Int
scheduledDelayMillis :: Int
maxQueueSize :: Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
maxQueueSize :: BatchTimeoutConfig -> Int
..} Exporter ImmutableSpan
exporter = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
rtsSupportsBoundThreads forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => String -> a
error String
"The hs-opentelemetry batch processor does not work without the -threaded GHC flag!"
  IORef (BoundedMap ImmutableSpan)
batch <- forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$ forall a. Int -> Int -> BoundedMap a
boundedMap Int
maxQueueSize Int
maxExportBatchSize
  TMVar ()
workSignal <- forall a. IO (TMVar a)
newEmptyTMVarIO
  TMVar ()
shutdownSignal <- forall a. IO (TMVar a)
newEmptyTMVarIO
  let publish :: HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
        
        
        
        
        
        
        
        forall a.
Exporter a
-> HashMap InstrumentationLibrary (Vector a) -> IO ExportResult
Exporter.exporterExport Exporter ImmutableSpan
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
  let flushQueueImmediately :: ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret = do
        HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
        if forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
          then do
            forall (f :: * -> *) a. Applicative f => a -> f a
pure ExportResult
ret
          else do
            ExportResult
ret' <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
            ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret'
  let waiting :: IO ProcessorMessage
waiting = do
        TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
scheduledDelayMillis)
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
            
            [ ProcessorMessage
ScheduledFlush forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
                Bool
continue <- forall a. TVar a -> STM a
readTVar TVar Bool
delay
                Bool -> STM ()
check Bool
continue
            , ProcessorMessage
MaxExportFlush forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TMVar a -> STM a
takeTMVar TMVar ()
workSignal
            , ProcessorMessage
Shutdown forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TMVar a -> STM a
takeTMVar TMVar ()
shutdownSignal
            ]
  let workerAction :: IO ExportResult
workerAction = do
        ProcessorMessage
req <- IO ProcessorMessage
waiting
        HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
        ExportResult
res <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
        
        case ProcessorMessage
req of
          ProcessorMessage
Shutdown ->
            ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
res
          ProcessorMessage
_ ->
            IO ExportResult
workerAction
  
  Async ExportResult
worker <- forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> forall a. IO a -> IO a
unmask IO ExportResult
workerAction
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
    Processor
      { processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
processorOnStart = \IORef ImmutableSpan
_ Context
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      , processorOnEnd :: IORef ImmutableSpan -> IO ()
processorOnEnd = \IORef ImmutableSpan
s -> do
          ImmutableSpan
span_ <- forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
          Bool
appendFailedOrExportNeeded <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a b. (a -> b) -> a -> b
$ \BoundedMap ImmutableSpan
builder ->
            case ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
span_ BoundedMap ImmutableSpan
builder of
              Maybe (BoundedMap ImmutableSpan)
Nothing -> (BoundedMap ImmutableSpan
builder, Bool
True)
              Just BoundedMap ImmutableSpan
b' ->
                if forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
b' forall a. Ord a => a -> a -> Bool
>= forall a. BoundedMap a -> Int
itemMaxExportBounds BoundedMap ImmutableSpan
b'
                  then 
                    (BoundedMap ImmutableSpan
b', Bool
True)
                  else (BoundedMap ImmutableSpan
b', Bool
False)
          forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailedOrExportNeeded forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
      , processorForceFlush :: IO ()
processorForceFlush = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
      , 
        processorShutdown :: IO (Async ShutdownResult)
processorShutdown =
          forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ do
            
            
            
            
            
            
            
            
            
            
            forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
              
              
              
              
              
              forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
shutdownSignal ()
              
              
              
              
              
              TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
exportTimeoutMillis)
              Maybe (Either SomeException ExportResult)
shutdownResult <-
                forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
                  forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
                    [ forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ExportResult
worker
                    , forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
                        Bool
shouldStop <- forall a. TVar a -> STM a
readTVar TVar Bool
delay
                        Bool -> STM ()
check Bool
shouldStop
                    ]
              
              forall a. Async a -> IO ()
cancel Async ExportResult
worker
              
              forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ case Maybe (Either SomeException ExportResult)
shutdownResult of
                Maybe (Either SomeException ExportResult)
Nothing ->
                  ShutdownResult
ShutdownTimeout
                Just Either SomeException ExportResult
er ->
                  case Either SomeException ExportResult
er of
                    Left SomeException
_ ->
                      ShutdownResult
ShutdownFailure
                    Right ExportResult
_ ->
                      ShutdownResult
ShutdownSuccess
      }
  where
    millisToMicros :: Int -> Int
millisToMicros = (forall a. Num a => a -> a -> a
* Int
1000)