{-# LANGUAGE RecordWildCards #-}

-----------------------------------------------------------------------------

-----------------------------------------------------------------------------

{- |
 Module      :  OpenTelemetry.Processor.Batch
 Copyright   :  (c) Ian Duncan, 2021
 License     :  BSD-3
 Description :  Performant exporting of spans in time & space-bounded batches.
 Maintainer  :  Ian Duncan
 Stability   :  experimental
 Portability :  non-portable (GHC extensions)

 This is an implementation of the Span Processor which create batches of finished spans and passes the export-friendly span data representations to the configured Exporter.
-}
module OpenTelemetry.Processor.Batch (
  BatchTimeoutConfig (..),
  batchTimeoutConfig,
  batchProcessor,
  -- , BatchProcessorOperations
) where

import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import Data.Vector (Vector)
import OpenTelemetry.Exporter (Exporter)
import qualified OpenTelemetry.Exporter as Exporter
import OpenTelemetry.Processor
import OpenTelemetry.Trace.Core
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder


-- | Configurable options for batch exporting frequence and size
data BatchTimeoutConfig = BatchTimeoutConfig
  { BatchTimeoutConfig -> Int
maxQueueSize :: Int
  -- ^ The maximum queue size. After the size is reached, spans are dropped.
  , BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
  -- ^ The delay interval in milliseconds between two consective exports.
  -- The default value is 5000.
  , BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
  -- ^ How long the export can run before it is cancelled.
  -- The default value is 30000.
  , BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
  -- ^ The maximum batch size of every export. It must be
  -- smaller or equal to 'maxQueueSize'. The default value is 512.
  }
  deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchTimeoutConfig] -> ShowS
$cshowList :: [BatchTimeoutConfig] -> ShowS
show :: BatchTimeoutConfig -> String
$cshow :: BatchTimeoutConfig -> String
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
Show)


-- | Default configuration values
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig =
  BatchTimeoutConfig
    { maxQueueSize :: Int
maxQueueSize = Int
1024
    , scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
    , exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
    , maxExportBatchSize :: Int
maxExportBatchSize = Int
512
    }


-- type BatchProcessorOperations = ()

--  A multi-producer single-consumer green/blue buffer.
-- Write requests that cannot fit in the live chunk will be dropped
--
-- TODO, would be cool to use AtomicCounters for this if possible
-- data GreenBlueBuffer a = GreenBlueBuffer
--   { gbReadSection :: !(TVar Word)
--   , gbWriteGreenOrBlue :: !(TVar Word)
--   , gbPendingWrites :: !(TVar Word)
--   , gbSectionSize :: !Int
--   , gbVector :: !(M.IOVector a)
--   }

{- brainstorm: Single Word64 state sketch

  63 (high bit): green or blue
  32-62: read section
  0-32: write count
-}

{-

Green
    512       512       512       512
\|---------|---------|---------|---------|
     0         1         2         3

Blue
    512       512       512       512
\|---------|---------|---------|---------|
     0         1         2         3

The current read section denotes one chunk of length gbSize, which gets flushed
to the span exporter. Once the vector has been copied for export, gbReadSection
will be incremented.

-}

-- newGreenBlueBuffer
--   :: Int  --  Max queue size (2048)
--   -> Int  --  Export batch size (512)
--   -> IO (GreenBlueBuffer a)
-- newGreenBlueBuffer maxQueueSize batchSize = do
--   let logBase2 = finiteBitSize maxQueueSize - 1 - countLeadingZeros maxQueueSize

--   let closestFittingPowerOfTwo = 2 * if (1 `shiftL` logBase2) == maxQueueSize
--         then maxQueueSize
--         else 1 `shiftL` (logBase2 + 1)

--   readSection <- newTVarIO 0
--   writeSection <- newTVarIO 0
--   writeCount <- newTVarIO 0
--   buf <- M.new closestFittingPowerOfTwo
--   pure $ GreenBlueBuffer
--     { gbSize = maxQueueSize
--     , gbVector = buf
--     , gbReadSection = readSection
--     , gbPendingWrites = writeCount
--     }

-- isEmpty :: GreenBlueBuffer a -> STM Bool
-- isEmpty = do
--   c <- readTVar gbPendingWrites
--   pure (c == 0)

-- data InsertResult = ValueDropped | ValueInserted

-- tryInsert :: GreenBlueBuffer a -> a -> IO InsertResult
-- tryInsert GreenBlueBuffer{..} x = atomically $ do
--   c <- readTVar gbPendingWrites
--   if c == gbMaxLength
--     then pure ValueDropped
--     else do
--       greenOrBlue <- readTVar gbWriteGreenOrBlue
--       let i = c + ((M.length gbVector `shiftR` 1) `shiftL` (greenOrBlue `mod` 2))
--       M.write gbVector i x
--       writeTVar gbPendingWrites (c + 1)
--       pure ValueInserted

-- Caution, single writer means that this can't be called concurrently
-- consumeChunk :: GreenBlueBuffer a -> IO (V.Vector a)
-- consumeChunk GreenBlueBuffer{..} = atomically $ do
--   r <- readTVar gbReadSection
--   w <- readTVar gbWriteSection
--   c <- readTVar gbPendingWrites
--   when (r == w) $ do
--     modifyTVar gbWriteSection (+ 1)
--     setTVar gbPendingWrites 0
--   -- TODO slice and freeze appropriate section
-- M.slice (gbSectionSize * (r .&. gbSectionMask)

-- TODO, counters for dropped spans, exported spans

data BoundedMap a = BoundedMap
  { forall a. BoundedMap a -> Int
itemBounds :: !Int
  , forall a. BoundedMap a -> Int
itemMaxExportBounds :: !Int
  , forall a. BoundedMap a -> Int
itemCount :: !Int
  , forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder a)
  }


boundedMap :: Int -> Int -> BoundedMap a
boundedMap :: forall a. Int -> Int -> BoundedMap a
boundedMap Int
bounds Int
exportBounds = forall a.
Int
-> Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
BoundedMap Int
bounds Int
exportBounds Int
0 forall a. Monoid a => a
mempty


push :: ImmutableSpan -> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push :: ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
s BoundedMap ImmutableSpan
m =
  if forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m forall a. Num a => a -> a -> a
+ Int
1 forall a. Ord a => a -> a -> Bool
>= forall a. BoundedMap a -> Int
itemBounds BoundedMap ImmutableSpan
m
    then forall a. Maybe a
Nothing
    else
      forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$!
        BoundedMap ImmutableSpan
m
          { itemCount :: Int
itemCount = forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m forall a. Num a => a -> a -> a
+ Int
1
          , itemMap :: HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap =
              forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith
                forall a. Semigroup a => a -> a -> a
(<>)
                (Tracer -> InstrumentationLibrary
tracerName forall a b. (a -> b) -> a -> b
$ ImmutableSpan -> Tracer
spanTracer ImmutableSpan
s)
                (forall element. element -> Builder element
Builder.singleton ImmutableSpan
s)
                forall a b. (a -> b) -> a -> b
$ forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap ImmutableSpan
m
          }


buildExport :: BoundedMap a -> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport :: forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport BoundedMap a
m =
  ( BoundedMap a
m {itemCount :: Int
itemCount = Int
0, itemMap :: HashMap InstrumentationLibrary (Builder a)
itemMap = forall a. Monoid a => a
mempty}
  , forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap a
m
  )


data ProcessorMessage = ScheduledFlush | MaxExportFlush | Shutdown


-- note: [Unmasking Asyncs]
--
-- It is possible to create unkillable asyncs. Behold:
--
-- ```
-- a <- uninterruptibleMask_ $ do
--     async $ do
--         forever $ do
--             threadDelay 10_000
--             putStrLn "still alive"
-- cancel a
-- ```
--
-- The prior code block will never successfully cancel `a` and will be
-- blocked forever. The reason is that `cancel` sends an async exception to
-- the thread performing the action, but the `uninterruptibleMask` state is
-- inherited by the forked thread. This means that *no async exceptions*
-- can reach it, and `cancel` will therefore run forever.
--
-- This also affects `timeout`, which uses an async exception to kill the
-- running job. If the action is done in an uninterruptible masked state,
-- then the timeout will not successfully kill the running action.

{- |
 The batch processor accepts spans and places them into batches. Batching helps better compress the data and reduce the number of outgoing connections
 required to transmit the data. This processor supports both size and time based batching.

 NOTE: this function requires the program be compiled with the @-threaded@ GHC
 option and will throw an error if this is not the case.
-}
batchProcessor :: (MonadIO m) => BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor :: forall (m :: * -> *).
MonadIO m =>
BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor BatchTimeoutConfig {Int
maxExportBatchSize :: Int
exportTimeoutMillis :: Int
scheduledDelayMillis :: Int
maxQueueSize :: Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
maxQueueSize :: BatchTimeoutConfig -> Int
..} Exporter ImmutableSpan
exporter = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
rtsSupportsBoundThreads forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => String -> a
error String
"The hs-opentelemetry batch processor does not work without the -threaded GHC flag!"
  IORef (BoundedMap ImmutableSpan)
batch <- forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$ forall a. Int -> Int -> BoundedMap a
boundedMap Int
maxQueueSize Int
maxExportBatchSize
  TMVar ()
workSignal <- forall a. IO (TMVar a)
newEmptyTMVarIO
  TMVar ()
shutdownSignal <- forall a. IO (TMVar a)
newEmptyTMVarIO
  let publish :: HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
        -- we mask async exceptions in this, so that a buggy exporter that
        -- catches async exceptions won't swallow them. since we use
        -- an interruptible mask, blocking calls can still be killed, like
        -- `threadDelay` or `putMVar` or most file I/O operations.
        --
        -- if we've received a shutdown, then we should be expecting
        -- a `cancel` anytime now.
        forall a.
Exporter a
-> HashMap InstrumentationLibrary (Vector a) -> IO ExportResult
Exporter.exporterExport Exporter ImmutableSpan
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess

  let flushQueueImmediately :: ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret = do
        HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
        if forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
          then do
            forall (f :: * -> *) a. Applicative f => a -> f a
pure ExportResult
ret
          else do
            ExportResult
ret' <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
            ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret'

  let waiting :: IO ProcessorMessage
waiting = do
        TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
scheduledDelayMillis)
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
            -- Flush every scheduled delay time, when we've reached the max export size, or when the shutdown signal is received.
            [ ProcessorMessage
ScheduledFlush forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
                Bool
continue <- forall a. TVar a -> STM a
readTVar TVar Bool
delay
                Bool -> STM ()
check Bool
continue
            , ProcessorMessage
MaxExportFlush forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TMVar a -> STM a
takeTMVar TMVar ()
workSignal
            , ProcessorMessage
Shutdown forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TMVar a -> STM a
takeTMVar TMVar ()
shutdownSignal
            ]

  let workerAction :: IO ExportResult
workerAction = do
        ProcessorMessage
req <- IO ProcessorMessage
waiting
        HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
        ExportResult
res <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess

        -- if we were asked to shutdown, stop waiting and flush it all out
        case ProcessorMessage
req of
          ProcessorMessage
Shutdown ->
            ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
res
          ProcessorMessage
_ ->
            IO ExportResult
workerAction
  -- see note [Unmasking Asyncs]
  Async ExportResult
worker <- forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> forall a. IO a -> IO a
unmask IO ExportResult
workerAction

  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
    Processor
      { processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
processorOnStart = \IORef ImmutableSpan
_ Context
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      , processorOnEnd :: IORef ImmutableSpan -> IO ()
processorOnEnd = \IORef ImmutableSpan
s -> do
          ImmutableSpan
span_ <- forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
          Bool
appendFailedOrExportNeeded <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch forall a b. (a -> b) -> a -> b
$ \BoundedMap ImmutableSpan
builder ->
            case ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
span_ BoundedMap ImmutableSpan
builder of
              Maybe (BoundedMap ImmutableSpan)
Nothing -> (BoundedMap ImmutableSpan
builder, Bool
True)
              Just BoundedMap ImmutableSpan
b' ->
                if forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
b' forall a. Ord a => a -> a -> Bool
>= forall a. BoundedMap a -> Int
itemMaxExportBounds BoundedMap ImmutableSpan
b'
                  then -- If the batch has grown to the maximum export size, prompt the worker to export it.
                    (BoundedMap ImmutableSpan
b', Bool
True)
                  else (BoundedMap ImmutableSpan
b', Bool
False)
          forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailedOrExportNeeded forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
      , processorForceFlush :: IO ()
processorForceFlush = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
      , -- TODO where to call restore, if anywhere?
        processorShutdown :: IO (Async ShutdownResult)
processorShutdown =
          forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ do
            -- we call asyncWithUnmask here because the shutdown action is
            -- likely to happen inside of a `finally` or `bracket`. the
            -- @safe-exceptions@ pattern (followed by unliftio as well)
            -- will use uninterruptibleMask in an exception cleanup. the
            -- uninterruptibleMask state means that the `timeout` call
            -- below will never exit, because `wait worker` will be in the
            -- `uninterruptibleMasked` state, and the timeout async
            -- exception will not be delivered.
            --
            -- see note [Unmasking Asyncs]
            forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
              -- is it a little silly that we unmask and remask? seems
              -- silly! but the `mask` here is doing an interruptible mask.
              -- which means that async exceptions can still be delivered
              -- if a process is blocking.

              -- flush remaining messages and signal the worker to shutdown
              forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
shutdownSignal ()

              -- gracefully wait for the worker to stop. we may be in
              -- a `bracket` or responding to an async exception, so we
              -- must be very careful not to wait too long. the following
              -- STM action will block, so we'll be susceptible to an async
              -- exception.
              TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
exportTimeoutMillis)
              Maybe (Either SomeException ExportResult)
shutdownResult <-
                forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
                  forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
                    [ forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ExportResult
worker
                    , forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
                        Bool
shouldStop <- forall a. TVar a -> STM a
readTVar TVar Bool
delay
                        Bool -> STM ()
check Bool
shouldStop
                    ]

              -- make sure the worker comes down if we timed out.
              forall a. Async a -> IO ()
cancel Async ExportResult
worker
              -- TODO, not convinced we should shut down processor here

              forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ case Maybe (Either SomeException ExportResult)
shutdownResult of
                Maybe (Either SomeException ExportResult)
Nothing ->
                  ShutdownResult
ShutdownTimeout
                Just Either SomeException ExportResult
er ->
                  case Either SomeException ExportResult
er of
                    Left SomeException
_ ->
                      ShutdownResult
ShutdownFailure
                    Right ExportResult
_ ->
                      ShutdownResult
ShutdownSuccess
      }
  where
    millisToMicros :: Int -> Int
millisToMicros = (forall a. Num a => a -> a -> a
* Int
1000)

{-
buffer <- newGreenBlueBuffer _ _
batchProcessorAction <- async $ forever $ do
  -- It would be nice to do an immediate send when possible
  chunk <- if (sendDelay == 0)
    else consumeChunk
    then threadDelay sendDelay >> consumeChunk
  timeout _ $ export exporter chunk
pure $ Processor
  { onStart = \_ _ -> pure ()
  , onEnd = \s -> void $ tryInsert buffer s
  , shutdown = do
      gracefullyShutdownBatchProcessor

  , forceFlush = pure ()
  }
where
  sendDelay = scheduledDelayMilis * 1_000
-}