module Kafka.Worker.Settings
  ( Settings (..),
    decoder,
    MaxMsgsPerSecondPerPartition (..),
    MaxMsgsPerPartitionBufferedLocally (..),
    MaxPollIntervalMs (..),
    SkipOrNot (..),
  )
where

import qualified Environment
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Settings.Internal as Internal
import qualified Observability
import qualified Prelude

-- | Settings required to process kafka messages
data Settings = Settings
  { -- | broker addresses. See hw-kafka's documentation for more info
    Settings -> [BrokerAddress]
brokerAddresses :: [Consumer.BrokerAddress],
    -- | Worker will poll Kafka for new messages. This is the timeout
    Settings -> Timeout
pollingTimeout :: Consumer.Timeout,
    -- | Used for throttling. Turn this down to give Kafka a speed limit.
    Settings -> MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition :: MaxMsgsPerSecondPerPartition,
    Settings -> KafkaLogLevel
logLevel :: Internal.KafkaLogLevel,
    Settings -> Settings
observability :: Observability.Settings,
    -- | Provides backpressure from message-workers to the queue-reader worker.
    -- Ensures that the thread responsible for pulling messages off of kafka
    -- doesn't race ahead / steal resources from the threads executing messages.
    Settings -> MaxMsgsPerPartitionBufferedLocally
maxMsgsPerPartitionBufferedLocally :: MaxMsgsPerPartitionBufferedLocally,
    Settings -> BatchSize
pollBatchSize :: Consumer.BatchSize,
    -- | Time between polling
    Settings -> MaxPollIntervalMs
maxPollIntervalMs :: MaxPollIntervalMs,
    -- | This option provides us the possibility to skip messages on failure.
    -- Useful for testing Kafka worker. DoNotSkip is a reasonable default!
    Settings -> SkipOrNot
onProcessMessageSkip :: SkipOrNot
  }

-- | This option provides us the possibility to skip messages on failure.
-- Useful for testing Kafka worker. DoNotSkip is a reasonable default!
data SkipOrNot = Skip | DoNotSkip

-- | Used for throttling. Turn this down to give Kafka a speed limit.
data MaxMsgsPerSecondPerPartition = ThrottleAt Int | DontThrottle

-- | Provides backpressure from message-workers to the queue-reader worker.
-- Ensures that the thread responsible for pulling messages off of kafka
-- doesn't race ahead / steal resources from the threads executing messages.
newtype MaxMsgsPerPartitionBufferedLocally = MaxMsgsPerPartitionBufferedLocally {MaxMsgsPerPartitionBufferedLocally -> Int
unMaxMsgsPerPartitionBufferedLocally :: Int}

-- | Time between polling
newtype MaxPollIntervalMs = MaxPollIntervalMs {MaxPollIntervalMs -> Int
unMaxPollIntervalMs :: Int}

-- | decodes Settings from environmental variables
-- Also consumes Observability env variables (see nri-observability)
-- KAFKA_BROKER_ADDRESSES=localhost:9092 # comma delimeted list
-- KAFKA_LOG_LEVEL=Debug
-- KAFKA_POLLING_TIMEOUT=1000
-- KAFKA_MAX_MESSAGES_PER_SECOND_PER_PARTITION=0 (disabled)
-- KAFKA_MAX_POLL_INTERVAL_MS=300000
-- KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY=100
-- KAFKA_POLL_BATCH_SIZE=100
-- KAFKA_SKIP_ON_PROCESS_MESSAGE_FAILURE=0
-- KAFKA_GROUP_ID=0
decoder :: Environment.Decoder Settings
decoder :: Decoder Settings
decoder =
  ([BrokerAddress]
 -> Timeout
 -> MaxMsgsPerSecondPerPartition
 -> KafkaLogLevel
 -> Settings
 -> MaxMsgsPerPartitionBufferedLocally
 -> BatchSize
 -> MaxPollIntervalMs
 -> SkipOrNot
 -> Settings)
-> Decoder
     ([BrokerAddress]
      -> Timeout
      -> MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure [BrokerAddress]
-> Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings
Settings
    Decoder
  ([BrokerAddress]
   -> Timeout
   -> MaxMsgsPerSecondPerPartition
   -> KafkaLogLevel
   -> Settings
   -> MaxMsgsPerPartitionBufferedLocally
   -> BatchSize
   -> MaxPollIntervalMs
   -> SkipOrNot
   -> Settings)
-> (Decoder
      ([BrokerAddress]
       -> Timeout
       -> MaxMsgsPerSecondPerPartition
       -> KafkaLogLevel
       -> Settings
       -> MaxMsgsPerPartitionBufferedLocally
       -> BatchSize
       -> MaxPollIntervalMs
       -> SkipOrNot
       -> Settings)
    -> Decoder
         (Timeout
          -> MaxMsgsPerSecondPerPartition
          -> KafkaLogLevel
          -> Settings
          -> MaxMsgsPerPartitionBufferedLocally
          -> BatchSize
          -> MaxPollIntervalMs
          -> SkipOrNot
          -> Settings))
-> Decoder
     (Timeout
      -> MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder [BrokerAddress]
-> Decoder
     ([BrokerAddress]
      -> Timeout
      -> MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
-> Decoder
     (Timeout
      -> MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder [BrokerAddress]
Internal.decoderBrokerAddresses
    Decoder
  (Timeout
   -> MaxMsgsPerSecondPerPartition
   -> KafkaLogLevel
   -> Settings
   -> MaxMsgsPerPartitionBufferedLocally
   -> BatchSize
   -> MaxPollIntervalMs
   -> SkipOrNot
   -> Settings)
-> (Decoder
      (Timeout
       -> MaxMsgsPerSecondPerPartition
       -> KafkaLogLevel
       -> Settings
       -> MaxMsgsPerPartitionBufferedLocally
       -> BatchSize
       -> MaxPollIntervalMs
       -> SkipOrNot
       -> Settings)
    -> Decoder
         (MaxMsgsPerSecondPerPartition
          -> KafkaLogLevel
          -> Settings
          -> MaxMsgsPerPartitionBufferedLocally
          -> BatchSize
          -> MaxPollIntervalMs
          -> SkipOrNot
          -> Settings))
-> Decoder
     (MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder Timeout
-> Decoder
     (Timeout
      -> MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
-> Decoder
     (MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder Timeout
decoderPollingTimeout
    Decoder
  (MaxMsgsPerSecondPerPartition
   -> KafkaLogLevel
   -> Settings
   -> MaxMsgsPerPartitionBufferedLocally
   -> BatchSize
   -> MaxPollIntervalMs
   -> SkipOrNot
   -> Settings)
-> (Decoder
      (MaxMsgsPerSecondPerPartition
       -> KafkaLogLevel
       -> Settings
       -> MaxMsgsPerPartitionBufferedLocally
       -> BatchSize
       -> MaxPollIntervalMs
       -> SkipOrNot
       -> Settings)
    -> Decoder
         (KafkaLogLevel
          -> Settings
          -> MaxMsgsPerPartitionBufferedLocally
          -> BatchSize
          -> MaxPollIntervalMs
          -> SkipOrNot
          -> Settings))
-> Decoder
     (KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder MaxMsgsPerSecondPerPartition
-> Decoder
     (MaxMsgsPerSecondPerPartition
      -> KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
-> Decoder
     (KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder MaxMsgsPerSecondPerPartition
decoderMaxMessagesPerSecondPerPartition
    Decoder
  (KafkaLogLevel
   -> Settings
   -> MaxMsgsPerPartitionBufferedLocally
   -> BatchSize
   -> MaxPollIntervalMs
   -> SkipOrNot
   -> Settings)
-> (Decoder
      (KafkaLogLevel
       -> Settings
       -> MaxMsgsPerPartitionBufferedLocally
       -> BatchSize
       -> MaxPollIntervalMs
       -> SkipOrNot
       -> Settings)
    -> Decoder
         (Settings
          -> MaxMsgsPerPartitionBufferedLocally
          -> BatchSize
          -> MaxPollIntervalMs
          -> SkipOrNot
          -> Settings))
-> Decoder
     (Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder KafkaLogLevel
-> Decoder
     (KafkaLogLevel
      -> Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
-> Decoder
     (Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder KafkaLogLevel
Internal.decoderKafkaLogLevel
    Decoder
  (Settings
   -> MaxMsgsPerPartitionBufferedLocally
   -> BatchSize
   -> MaxPollIntervalMs
   -> SkipOrNot
   -> Settings)
-> (Decoder
      (Settings
       -> MaxMsgsPerPartitionBufferedLocally
       -> BatchSize
       -> MaxPollIntervalMs
       -> SkipOrNot
       -> Settings)
    -> Decoder
         (MaxMsgsPerPartitionBufferedLocally
          -> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings))
-> Decoder
     (MaxMsgsPerPartitionBufferedLocally
      -> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder Settings
-> Decoder
     (Settings
      -> MaxMsgsPerPartitionBufferedLocally
      -> BatchSize
      -> MaxPollIntervalMs
      -> SkipOrNot
      -> Settings)
-> Decoder
     (MaxMsgsPerPartitionBufferedLocally
      -> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder Settings
Observability.decoder
    Decoder
  (MaxMsgsPerPartitionBufferedLocally
   -> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> (Decoder
      (MaxMsgsPerPartitionBufferedLocally
       -> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
    -> Decoder
         (BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings))
-> Decoder
     (BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder MaxMsgsPerPartitionBufferedLocally
-> Decoder
     (MaxMsgsPerPartitionBufferedLocally
      -> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder
     (BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder MaxMsgsPerPartitionBufferedLocally
decoderMaxMsgsPerPartitionBufferedLocally
    Decoder (BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> (Decoder
      (BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
    -> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings))
-> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder BatchSize
-> Decoder
     (BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder BatchSize
decoderPollBatchSize
    Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
-> (Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
    -> Decoder (SkipOrNot -> Settings))
-> Decoder (SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder MaxPollIntervalMs
-> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder (SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder MaxPollIntervalMs
decoderMaxPollIntervalMs
    Decoder (SkipOrNot -> Settings)
-> (Decoder (SkipOrNot -> Settings) -> Decoder Settings)
-> Decoder Settings
forall a b. a -> (a -> b) -> b
|> Decoder SkipOrNot
-> Decoder (SkipOrNot -> Settings) -> Decoder Settings
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder SkipOrNot
decoderOnProcessMessageFailure

decoderPollingTimeout :: Environment.Decoder Consumer.Timeout
decoderPollingTimeout :: Decoder Timeout
decoderPollingTimeout =
  Variable -> Parser Timeout -> Decoder Timeout
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_POLLING_TIMEOUT",
        description :: Text
Environment.description = Text
"Polling timout for consumers",
        defaultValue :: Text
Environment.defaultValue = Text
"1000"
      }
    ((Int -> Timeout) -> Parser Int -> Parser Timeout
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> Timeout
Consumer.Timeout Parser Int
forall a. Integral a => Parser a
Environment.int)

decoderMaxMessagesPerSecondPerPartition :: Environment.Decoder MaxMsgsPerSecondPerPartition
decoderMaxMessagesPerSecondPerPartition :: Decoder MaxMsgsPerSecondPerPartition
decoderMaxMessagesPerSecondPerPartition =
  Variable
-> Parser MaxMsgsPerSecondPerPartition
-> Decoder MaxMsgsPerSecondPerPartition
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_MAX_MESSAGES_PER_SECOND_PER_PARTITION",
        description :: Text
Environment.description = Text
"This is how we throttle workers. Sets the maximum amount of messages this worker should process per second per partition. 0 is disabled.",
        defaultValue :: Text
Environment.defaultValue = Text
"0"
      }
    ( (Int -> MaxMsgsPerSecondPerPartition)
-> Parser Int -> Parser MaxMsgsPerSecondPerPartition
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map
        ( \Int
maxPerSecond ->
            ( if Int
maxPerSecond Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                then MaxMsgsPerSecondPerPartition
DontThrottle
                else Int -> MaxMsgsPerSecondPerPartition
ThrottleAt Int
maxPerSecond
            )
        )
        Parser Int
forall a. Integral a => Parser a
Environment.int
    )

decoderMaxPollIntervalMs :: Environment.Decoder MaxPollIntervalMs
decoderMaxPollIntervalMs :: Decoder MaxPollIntervalMs
decoderMaxPollIntervalMs =
  Variable -> Parser MaxPollIntervalMs -> Decoder MaxPollIntervalMs
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_MAX_POLL_INTERVAL_MS",
        description :: Text
Environment.description = Text
"This is used to set max.poll.interval.ms",
        defaultValue :: Text
Environment.defaultValue = Text
"300000"
      }
    ((Int -> MaxPollIntervalMs)
-> Parser Int -> Parser MaxPollIntervalMs
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> MaxPollIntervalMs
MaxPollIntervalMs Parser Int
forall a. Integral a => Parser a
Environment.int)

decoderMaxMsgsPerPartitionBufferedLocally :: Environment.Decoder MaxMsgsPerPartitionBufferedLocally
decoderMaxMsgsPerPartitionBufferedLocally :: Decoder MaxMsgsPerPartitionBufferedLocally
decoderMaxMsgsPerPartitionBufferedLocally =
  Variable
-> Parser MaxMsgsPerPartitionBufferedLocally
-> Decoder MaxMsgsPerPartitionBufferedLocally
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY",
        description :: Text
Environment.description = Text
"Pausing reading from kafka when we have this many messages queued up but not yet processed",
        defaultValue :: Text
Environment.defaultValue = Text
"100"
      }
    ((Int -> MaxMsgsPerPartitionBufferedLocally)
-> Parser Int -> Parser MaxMsgsPerPartitionBufferedLocally
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> MaxMsgsPerPartitionBufferedLocally
MaxMsgsPerPartitionBufferedLocally Parser Int
forall a. Integral a => Parser a
Environment.int)

decoderPollBatchSize :: Environment.Decoder Consumer.BatchSize
decoderPollBatchSize :: Decoder BatchSize
decoderPollBatchSize =
  Variable -> Parser BatchSize -> Decoder BatchSize
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_POLL_BATCH_SIZE",
        description :: Text
Environment.description = Text
"The amount of messages we request in a single poll request to Kafka",
        defaultValue :: Text
Environment.defaultValue = Text
"100"
      }
    ((Int -> BatchSize) -> Parser Int -> Parser BatchSize
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> BatchSize
Consumer.BatchSize Parser Int
forall a. Integral a => Parser a
Environment.int)

decoderOnProcessMessageFailure :: Environment.Decoder SkipOrNot
decoderOnProcessMessageFailure :: Decoder SkipOrNot
decoderOnProcessMessageFailure =
  Variable -> Parser SkipOrNot -> Decoder SkipOrNot
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_SKIP_ON_PROCESS_MESSAGE_FAILURE",
        description :: Text
Environment.description = Text
"Whether to skip message that are failing processing. 1 means on, 0 means off.",
        defaultValue :: Text
Environment.defaultValue = Text
"0"
      }
    ( Parser Integer
-> (Integer -> Result Text SkipOrNot) -> Parser SkipOrNot
forall a b. Parser a -> (a -> Result Text b) -> Parser b
Environment.custom
        Parser Integer
forall a. Integral a => Parser a
Environment.int
        ( \Integer
int ->
            if Integer
int Integer -> Integer -> Bool
forall comparable.
Ord comparable =>
comparable -> comparable -> Bool
>= Integer
1
              then SkipOrNot -> Result Text SkipOrNot
forall error value. value -> Result error value
Ok SkipOrNot
Skip
              else SkipOrNot -> Result Text SkipOrNot
forall error value. value -> Result error value
Ok SkipOrNot
DoNotSkip
        )
    )