{-# LANGUAGE RecordWildCards #-}
module OpenTelemetry.Processor.Batch
( BatchTimeoutConfig(..)
, batchTimeoutConfig
, batchProcessor
) where
import Control.Monad.IO.Class
import OpenTelemetry.Processor
import OpenTelemetry.Exporter (Exporter)
import qualified OpenTelemetry.Exporter as Exporter
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder
import Data.IORef (atomicModifyIORef', readIORef, newIORef)
import Control.Concurrent.Async
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, tryPutMVar)
import Control.Monad
import Control.Monad.Trans.Except
import System.Timeout
import Control.Exception
import Data.HashMap.Strict (HashMap)
import OpenTelemetry.Trace.Core
import qualified Data.HashMap.Strict as HashMap
import Data.Vector (Vector)
data BatchTimeoutConfig = BatchTimeoutConfig
{ BatchTimeoutConfig -> Int
maxQueueSize :: Int
, BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
, BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
, BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
} deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
(Int -> BatchTimeoutConfig -> ShowS)
-> (BatchTimeoutConfig -> String)
-> ([BatchTimeoutConfig] -> ShowS)
-> Show BatchTimeoutConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchTimeoutConfig] -> ShowS
$cshowList :: [BatchTimeoutConfig] -> ShowS
show :: BatchTimeoutConfig -> String
$cshow :: BatchTimeoutConfig -> String
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
Show)
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig = BatchTimeoutConfig :: Int -> Int -> Int -> Int -> BatchTimeoutConfig
BatchTimeoutConfig
{ maxQueueSize :: Int
maxQueueSize = Int
1024
, scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
, exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
, maxExportBatchSize :: Int
maxExportBatchSize = Int
512
}
data BoundedMap a = BoundedMap
{ BoundedMap a -> Int
itemBounds :: !Int
, BoundedMap a -> Int
itemCount :: !Int
, BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder a)
}
boundedMap :: Int -> BoundedMap a
boundedMap :: Int -> BoundedMap a
boundedMap Int
bounds = Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
forall a.
Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
BoundedMap Int
bounds Int
0 HashMap InstrumentationLibrary (Builder a)
forall a. Monoid a => a
mempty
push :: ImmutableSpan -> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push :: ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
s BoundedMap ImmutableSpan
m = if BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemBounds BoundedMap ImmutableSpan
m
then Maybe (BoundedMap ImmutableSpan)
forall a. Maybe a
Nothing
else BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a. a -> Maybe a
Just (BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan))
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a b. (a -> b) -> a -> b
$! BoundedMap ImmutableSpan
m
{ itemCount :: Int
itemCount = BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
, itemMap :: HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap = (Builder ImmutableSpan
-> Builder ImmutableSpan -> Builder ImmutableSpan)
-> InstrumentationLibrary
-> Builder ImmutableSpan
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith
Builder ImmutableSpan
-> Builder ImmutableSpan -> Builder ImmutableSpan
forall a. Semigroup a => a -> a -> a
(<>)
(Tracer -> InstrumentationLibrary
tracerName (Tracer -> InstrumentationLibrary)
-> Tracer -> InstrumentationLibrary
forall a b. (a -> b) -> a -> b
$ ImmutableSpan -> Tracer
spanTracer ImmutableSpan
s)
(ImmutableSpan -> Builder ImmutableSpan
forall element. element -> Builder element
Builder.singleton ImmutableSpan
s) (HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan))
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall a b. (a -> b) -> a -> b
$
BoundedMap ImmutableSpan
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap ImmutableSpan
m
}
buildExport :: BoundedMap a -> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport :: BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport BoundedMap a
m =
( BoundedMap a
m { itemCount :: Int
itemCount = Int
0, itemMap :: HashMap InstrumentationLibrary (Builder a)
itemMap = HashMap InstrumentationLibrary (Builder a)
forall a. Monoid a => a
mempty }
, Builder a -> Vector a
forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build (Builder a -> Vector a)
-> HashMap InstrumentationLibrary (Builder a)
-> HashMap InstrumentationLibrary (Vector a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap a
m
)
loop :: Monad m => ExceptT e m a -> m e
loop :: ExceptT e m a -> m e
loop = (Either e e -> e) -> m (Either e e) -> m e
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ((e -> e) -> (e -> e) -> Either e e -> e
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either e -> e
forall a. a -> a
id e -> e
forall a. a -> a
id) (m (Either e e) -> m e)
-> (ExceptT e m a -> m (Either e e)) -> ExceptT e m a -> m e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT e m e -> m (Either e e)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT e m e -> m (Either e e))
-> (ExceptT e m a -> ExceptT e m e)
-> ExceptT e m a
-> m (Either e e)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT e m a -> ExceptT e m e
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
data ProcessorMessage = Flush | Shutdown
batchProcessor :: MonadIO m => BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor :: BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor BatchTimeoutConfig{Int
maxExportBatchSize :: Int
exportTimeoutMillis :: Int
scheduledDelayMillis :: Int
maxQueueSize :: Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
maxQueueSize :: BatchTimeoutConfig -> Int
..} Exporter ImmutableSpan
exporter = IO Processor -> m Processor
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Processor -> m Processor) -> IO Processor -> m Processor
forall a b. (a -> b) -> a -> b
$ do
IORef (BoundedMap ImmutableSpan)
batch <- BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan))
forall a. a -> IO (IORef a)
newIORef (BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan)))
-> BoundedMap ImmutableSpan
-> IO (IORef (BoundedMap ImmutableSpan))
forall a b. (a -> b) -> a -> b
$ Int -> BoundedMap ImmutableSpan
forall a. Int -> BoundedMap a
boundedMap Int
maxQueueSize
MVar ProcessorMessage
workSignal <- IO (MVar ProcessorMessage)
forall a. IO (MVar a)
newEmptyMVar
Async ExportResult
worker <- IO ExportResult -> IO (Async ExportResult)
forall a. IO a -> IO (Async a)
async (IO ExportResult -> IO (Async ExportResult))
-> IO ExportResult -> IO (Async ExportResult)
forall a b. (a -> b) -> a -> b
$ ExceptT ExportResult IO () -> IO ExportResult
forall (m :: * -> *) e a. Monad m => ExceptT e m a -> m e
loop (ExceptT ExportResult IO () -> IO ExportResult)
-> ExceptT ExportResult IO () -> IO ExportResult
forall a b. (a -> b) -> a -> b
$ do
Maybe ProcessorMessage
req <- IO (Maybe ProcessorMessage)
-> ExceptT ExportResult IO (Maybe ProcessorMessage)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe ProcessorMessage)
-> ExceptT ExportResult IO (Maybe ProcessorMessage))
-> IO (Maybe ProcessorMessage)
-> ExceptT ExportResult IO (Maybe ProcessorMessage)
forall a b. (a -> b) -> a -> b
$ Int -> IO ProcessorMessage -> IO (Maybe ProcessorMessage)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
millisToMicros Int
scheduledDelayMillis)
(IO ProcessorMessage -> IO (Maybe ProcessorMessage))
-> IO ProcessorMessage -> IO (Maybe ProcessorMessage)
forall a b. (a -> b) -> a -> b
$ MVar ProcessorMessage -> IO ProcessorMessage
forall a. MVar a -> IO a
takeMVar MVar ProcessorMessage
workSignal
HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
-> ExceptT
ExportResult
IO
(HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
-> ExceptT
ExportResult
IO
(HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
-> ExceptT
ExportResult
IO
(HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. (a -> b) -> a -> b
$ IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
ExportResult
res <- IO ExportResult -> ExceptT ExportResult IO ExportResult
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ExportResult -> ExceptT ExportResult IO ExportResult)
-> IO ExportResult -> ExceptT ExportResult IO ExportResult
forall a b. (a -> b) -> a -> b
$ Exporter ImmutableSpan
-> HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
forall a.
Exporter a
-> HashMap InstrumentationLibrary (Vector a) -> IO ExportResult
Exporter.exporterExport Exporter ImmutableSpan
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
case Maybe ProcessorMessage
req of
Just ProcessorMessage
Shutdown -> ExportResult -> ExceptT ExportResult IO ()
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE ExportResult
res
Maybe ProcessorMessage
_ -> () -> ExceptT ExportResult IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Processor -> IO Processor
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Processor -> IO Processor) -> Processor -> IO Processor
forall a b. (a -> b) -> a -> b
$ Processor :: (IORef ImmutableSpan -> Context -> IO ())
-> (IORef ImmutableSpan -> IO ())
-> IO (Async ShutdownResult)
-> IO ()
-> Processor
Processor
{ processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
processorOnStart = \IORef ImmutableSpan
_ Context
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, processorOnEnd :: IORef ImmutableSpan -> IO ()
processorOnEnd = \IORef ImmutableSpan
s -> do
ImmutableSpan
span_ <- IORef ImmutableSpan -> IO ImmutableSpan
forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
Bool
appendFailed <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch ((BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \BoundedMap ImmutableSpan
builder ->
case ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
span_ BoundedMap ImmutableSpan
builder of
Maybe (BoundedMap ImmutableSpan)
Nothing -> (BoundedMap ImmutableSpan
builder, Bool
True)
Just BoundedMap ImmutableSpan
b' -> (BoundedMap ImmutableSpan
b', Bool
False)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailed (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ProcessorMessage -> ProcessorMessage -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ProcessorMessage
workSignal ProcessorMessage
Flush
, processorForceFlush :: IO ()
processorForceFlush = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ProcessorMessage -> ProcessorMessage -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ProcessorMessage
workSignal ProcessorMessage
Flush
, processorShutdown :: IO (Async ShutdownResult)
processorShutdown = IO ShutdownResult -> IO (Async ShutdownResult)
forall a. IO a -> IO (Async a)
async (IO ShutdownResult -> IO (Async ShutdownResult))
-> IO ShutdownResult -> IO (Async ShutdownResult)
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult)
-> ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ProcessorMessage -> ProcessorMessage -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ProcessorMessage
workSignal ProcessorMessage
Shutdown
Maybe ExportResult
shutdownResult <- Int -> IO ExportResult -> IO (Maybe ExportResult)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
millisToMicros Int
exportTimeoutMillis) (IO ExportResult -> IO (Maybe ExportResult))
-> IO ExportResult -> IO (Maybe ExportResult)
forall a b. (a -> b) -> a -> b
$
Async ExportResult -> IO ExportResult
forall a. Async a -> IO a
wait Async ExportResult
worker
Async ExportResult -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async ExportResult
worker
case Maybe ExportResult
shutdownResult of
Maybe ExportResult
Nothing -> ShutdownResult -> IO ShutdownResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShutdownResult
ShutdownFailure
Just ExportResult
_ -> ShutdownResult -> IO ShutdownResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShutdownResult
ShutdownSuccess
}
where
millisToMicros :: Int -> Int
millisToMicros = (Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)