{-# LANGUAGE RecordWildCards #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  OpenTelemetry.Processor.Batch
-- Copyright   :  (c) Ian Duncan, 2021
-- License     :  BSD-3
-- Description :  Performant exporting of spans in time & space-bounded batches.
-- Maintainer  :  Ian Duncan
-- Stability   :  experimental
-- Portability :  non-portable (GHC extensions)
--
-- This is an implementation of the Span Processor which create batches of finished spans and passes the export-friendly span data representations to the configured Exporter.
--
-----------------------------------------------------------------------------
module OpenTelemetry.Processor.Batch
  ( BatchTimeoutConfig(..)
  , batchTimeoutConfig
  , batchProcessor
  -- , BatchProcessorOperations
  ) where
import Control.Monad.IO.Class
import OpenTelemetry.Processor
import OpenTelemetry.Exporter (Exporter)
import qualified OpenTelemetry.Exporter as Exporter
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder
import Data.IORef (atomicModifyIORef', readIORef, newIORef)
import Control.Concurrent.Async
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, tryPutMVar)
import Control.Monad
import System.Timeout
import Control.Exception
import Data.HashMap.Strict (HashMap)
import OpenTelemetry.Trace.Core
import qualified Data.HashMap.Strict as HashMap
import Data.Vector (Vector)

-- | Configurable options for batch exporting frequence and size
data BatchTimeoutConfig = BatchTimeoutConfig
  { BatchTimeoutConfig -> Int
maxQueueSize :: Int
  -- ^ The maximum queue size. After the size is reached, spans are dropped.
  , BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
  -- ^ The delay interval in milliseconds between two consective exports.
  -- The default value is 5000.
  , BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
  -- ^ How long the export can run before it is cancelled.
  -- The default value is 30000.
  , BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
  -- ^ The maximum batch size of every export. It must be
  -- smaller or equal to 'maxQueueSize'. The default value is 512.
  } deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
(Int -> BatchTimeoutConfig -> ShowS)
-> (BatchTimeoutConfig -> String)
-> ([BatchTimeoutConfig] -> ShowS)
-> Show BatchTimeoutConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchTimeoutConfig] -> ShowS
$cshowList :: [BatchTimeoutConfig] -> ShowS
show :: BatchTimeoutConfig -> String
$cshow :: BatchTimeoutConfig -> String
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
Show)

-- | Default configuration values
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig = BatchTimeoutConfig :: Int -> Int -> Int -> Int -> BatchTimeoutConfig
BatchTimeoutConfig
  { maxQueueSize :: Int
maxQueueSize = Int
1024
  , scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
  , exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
  , maxExportBatchSize :: Int
maxExportBatchSize = Int
512
  }

-- type BatchProcessorOperations = ()

--  A multi-producer single-consumer green/blue buffer.
-- Write requests that cannot fit in the live chunk will be dropped
--
-- TODO, would be cool to use AtomicCounters for this if possible
-- data GreenBlueBuffer a = GreenBlueBuffer
--   { gbReadSection :: !(TVar Word)
--   , gbWriteGreenOrBlue :: !(TVar Word)
--   , gbPendingWrites :: !(TVar Word)
--   , gbSectionSize :: !Int
--   , gbVector :: !(M.IOVector a)
--   }


{- brainstorm: Single Word64 state sketch


  63 (high bit): green or blue
  32-62: read section
  0-32: write count
-}

{-

Green
    512       512       512       512
|---------|---------|---------|---------|
     0         1         2         3

Blue
    512       512       512       512
|---------|---------|---------|---------|
     0         1         2         3

The current read section denotes one chunk of length gbSize, which gets flushed
to the span exporter. Once the vector has been copied for export, gbReadSection
will be incremented.

-}

-- newGreenBlueBuffer 
--   :: Int  --  Max queue size (2048)
--   -> Int  --  Export batch size (512)
--   -> IO (GreenBlueBuffer a)
-- newGreenBlueBuffer maxQueueSize batchSize = do
--   let logBase2 = finiteBitSize maxQueueSize - 1 - countLeadingZeros maxQueueSize

--   let closestFittingPowerOfTwo = 2 * if (1 `shiftL` logBase2) == maxQueueSize 
--         then maxQueueSize 
--         else 1 `shiftL` (logBase2 + 1)

--   readSection <- newTVarIO 0
--   writeSection <- newTVarIO 0
--   writeCount <- newTVarIO 0
--   buf <- M.new closestFittingPowerOfTwo
--   pure $ GreenBlueBuffer
--     { gbSize = maxQueueSize
--     , gbVector = buf
--     , gbReadSection = readSection
--     , gbPendingWrites = writeCount
--     }

-- isEmpty :: GreenBlueBuffer a -> STM Bool
-- isEmpty = do
--   c <- readTVar gbPendingWrites
--   pure (c == 0)

-- data InsertResult = ValueDropped | ValueInserted

-- tryInsert :: GreenBlueBuffer a -> a -> IO InsertResult
-- tryInsert GreenBlueBuffer{..} x = atomically $ do
--   c <- readTVar gbPendingWrites
--   if c == gbMaxLength
--     then pure ValueDropped
--     else do
--       greenOrBlue <- readTVar gbWriteGreenOrBlue
--       let i = c + ((M.length gbVector `shiftR` 1) `shiftL` (greenOrBlue `mod` 2))
--       M.write gbVector i x
--       writeTVar gbPendingWrites (c + 1)
--       pure ValueInserted

-- Caution, single writer means that this can't be called concurrently
-- consumeChunk :: GreenBlueBuffer a -> IO (V.Vector a)
-- consumeChunk GreenBlueBuffer{..} = atomically $ do
--   r <- readTVar gbReadSection
  --   w <- readTVar gbWriteSection
  --   c <- readTVar gbPendingWrites
  --   when (r == w) $ do
  --     modifyTVar gbWriteSection (+ 1)
  --     setTVar gbPendingWrites 0
  --   -- TODO slice and freeze appropriate section
    -- M.slice (gbSectionSize * (r .&. gbSectionMask) 

-- TODO, counters for dropped spans, exported spans

data BoundedMap a = BoundedMap
  { BoundedMap a -> Int
itemBounds :: !Int
  , BoundedMap a -> Int
itemCount :: !Int
  , BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder a)
  }

boundedMap :: Int -> BoundedMap a
boundedMap :: Int -> BoundedMap a
boundedMap Int
bounds = Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
forall a.
Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
BoundedMap Int
bounds Int
0 HashMap InstrumentationLibrary (Builder a)
forall a. Monoid a => a
mempty

push :: ImmutableSpan -> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push :: ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
s BoundedMap ImmutableSpan
m = if BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemBounds BoundedMap ImmutableSpan
m
  then Maybe (BoundedMap ImmutableSpan)
forall a. Maybe a
Nothing
  else BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a. a -> Maybe a
Just (BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan))
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a b. (a -> b) -> a -> b
$! BoundedMap ImmutableSpan
m
    { itemCount :: Int
itemCount = BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
    , itemMap :: HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap = (Builder ImmutableSpan
 -> Builder ImmutableSpan -> Builder ImmutableSpan)
-> InstrumentationLibrary
-> Builder ImmutableSpan
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith
        Builder ImmutableSpan
-> Builder ImmutableSpan -> Builder ImmutableSpan
forall a. Semigroup a => a -> a -> a
(<>)
        (Tracer -> InstrumentationLibrary
tracerName (Tracer -> InstrumentationLibrary)
-> Tracer -> InstrumentationLibrary
forall a b. (a -> b) -> a -> b
$ ImmutableSpan -> Tracer
spanTracer ImmutableSpan
s)
        (ImmutableSpan -> Builder ImmutableSpan
forall element. element -> Builder element
Builder.singleton ImmutableSpan
s) (HashMap InstrumentationLibrary (Builder ImmutableSpan)
 -> HashMap InstrumentationLibrary (Builder ImmutableSpan))
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall a b. (a -> b) -> a -> b
$
        BoundedMap ImmutableSpan
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap ImmutableSpan
m
    }

buildExport :: BoundedMap a -> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport :: BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport BoundedMap a
m =
  ( BoundedMap a
m { itemCount :: Int
itemCount = Int
0, itemMap :: HashMap InstrumentationLibrary (Builder a)
itemMap = HashMap InstrumentationLibrary (Builder a)
forall a. Monoid a => a
mempty }
  , Builder a -> Vector a
forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build (Builder a -> Vector a)
-> HashMap InstrumentationLibrary (Builder a)
-> HashMap InstrumentationLibrary (Vector a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap a
m
  )

-- |
-- The batch processor accepts spans and places them into batches. Batching helps better compress the data and reduce the number of outgoing connections 
-- required to transmit the data. This processor supports both size and time based batching.
--
batchProcessor :: MonadIO m => BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor :: BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor BatchTimeoutConfig{Int
maxExportBatchSize :: Int
exportTimeoutMillis :: Int
scheduledDelayMillis :: Int
maxQueueSize :: Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
maxQueueSize :: BatchTimeoutConfig -> Int
..} Exporter ImmutableSpan
exporter = IO Processor -> m Processor
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Processor -> m Processor) -> IO Processor -> m Processor
forall a b. (a -> b) -> a -> b
$ do
  IORef (BoundedMap ImmutableSpan)
batch <- BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan))
forall a. a -> IO (IORef a)
newIORef (BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan)))
-> BoundedMap ImmutableSpan
-> IO (IORef (BoundedMap ImmutableSpan))
forall a b. (a -> b) -> a -> b
$ Int -> BoundedMap ImmutableSpan
forall a. Int -> BoundedMap a
boundedMap Int
maxQueueSize
  MVar ()
workSignal <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  Async Any
worker <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ IO ExportResult -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO ExportResult -> IO Any) -> IO ExportResult -> IO Any
forall a b. (a -> b) -> a -> b
$ do
    IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
millisToMicros Int
scheduledDelayMillis) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
workSignal
    HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan
    -> (BoundedMap ImmutableSpan,
        HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
    HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
    Exporter ImmutableSpan
-> HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
forall a.
Exporter a
-> HashMap InstrumentationLibrary (Vector a) -> IO ExportResult
Exporter.exporterExport Exporter ImmutableSpan
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess


  Processor -> IO Processor
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Processor -> IO Processor) -> Processor -> IO Processor
forall a b. (a -> b) -> a -> b
$ Processor :: (IORef ImmutableSpan -> Context -> IO ())
-> (IORef ImmutableSpan -> IO ())
-> IO (Async ShutdownResult)
-> IO ()
-> Processor
Processor
    { processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
processorOnStart = \IORef ImmutableSpan
_ Context
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , processorOnEnd :: IORef ImmutableSpan -> IO ()
processorOnEnd = \IORef ImmutableSpan
s -> do
        ImmutableSpan
span_ <- IORef ImmutableSpan -> IO ImmutableSpan
forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
        Bool
appendFailed <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch ((BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
 -> IO Bool)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \BoundedMap ImmutableSpan
builder ->
          case ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
span_ BoundedMap ImmutableSpan
builder of
            Maybe (BoundedMap ImmutableSpan)
Nothing -> (BoundedMap ImmutableSpan
builder, Bool
True)
            Just BoundedMap ImmutableSpan
b' -> (BoundedMap ImmutableSpan
b', Bool
False)
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailed (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
workSignal ()

    , processorForceFlush :: IO ()
processorForceFlush = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
workSignal ()
    -- TODO where to call restore, if anywhere?
    , processorShutdown :: IO (Async ShutdownResult)
processorShutdown = IO ShutdownResult -> IO (Async ShutdownResult)
forall a. IO a -> IO (Async a)
async (IO ShutdownResult -> IO (Async ShutdownResult))
-> IO ShutdownResult -> IO (Async ShutdownResult)
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ShutdownResult)
 -> IO ShutdownResult)
-> ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
        Maybe ()
shutdownResult <- Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
millisToMicros Int
exportTimeoutMillis) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
          Async Any -> IO ()
forall a. Async a -> IO ()
cancel Async Any
worker
        -- TODO, not convinced we should shut down processor here

        case Maybe ()
shutdownResult of
          Maybe ()
Nothing -> ShutdownResult -> IO ShutdownResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShutdownResult
ShutdownFailure
          Just ()
_ -> ShutdownResult -> IO ShutdownResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShutdownResult
ShutdownSuccess
    }
  where
    millisToMicros :: Int -> Int
millisToMicros = (Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
  {-
  buffer <- newGreenBlueBuffer _ _
  batchProcessorAction <- async $ forever $ do
    -- It would be nice to do an immediate send when possible
    chunk <- if (sendDelay == 0) 
      else consumeChunk
      then threadDelay sendDelay >> consumeChunk
    timeout _ $ export exporter chunk
  pure $ Processor
    { onStart = \_ _ -> pure ()
    , onEnd = \s -> void $ tryInsert buffer s
    , shutdown = do
        gracefullyShutdownBatchProcessor
        

    , forceFlush = pure ()
    }
  where
    sendDelay = scheduledDelayMilis * 1_000
  -}