-- | Kafka.Settings
module Kafka.Settings
  ( Settings (..),
    decoder,
    BatchNumMessages,
    unBatchNumMessages,
    exampleBatchNumMessages,
  )
where

import qualified Environment
import qualified Kafka.Producer
import qualified Kafka.Settings.Internal as Internal

-- | Settings required to write to Kafka
data Settings = Settings
  { -- | broker addresses. See hw-kafka's documentation for more info
    Settings -> [BrokerAddress]
brokerAddresses :: [Kafka.Producer.BrokerAddress],
    -- | client log level. See hw-kafka's documentation for more info
    Settings -> KafkaLogLevel
logLevel :: Internal.KafkaLogLevel,
    -- | Message delivery timeout. See hw-kafka's documentation for more info
    Settings -> Timeout
deliveryTimeout :: Kafka.Producer.Timeout,
    -- | Number of messages to batch together before sending to Kafka.
    Settings -> BatchNumMessages
batchNumMessages :: BatchNumMessages
  }

-- | Number of messages to batch together before sending to Kafka.
newtype BatchNumMessages = BatchNumMessages {BatchNumMessages -> Int
unBatchNumMessages :: Int}

-- |  example BatchNumMessages to use in tests
exampleBatchNumMessages :: BatchNumMessages
exampleBatchNumMessages :: BatchNumMessages
exampleBatchNumMessages = Int -> BatchNumMessages
BatchNumMessages Int
1

-- | decodes Settings from environmental variables
-- KAFKA_BROKER_ADDRESSES=localhost:9092 # comma delimeted list
-- KAFKA_LOG_LEVEL=Debug
-- KAFKA_DELIVERY_TIMEOUT=120000
-- KAFKA_BATCH_SIZE=10000
decoder :: Environment.Decoder Settings
decoder :: Decoder Settings
decoder =
  ([BrokerAddress]
 -> KafkaLogLevel -> Timeout -> BatchNumMessages -> Settings)
-> Decoder [BrokerAddress]
-> Decoder KafkaLogLevel
-> Decoder Timeout
-> Decoder BatchNumMessages
-> Decoder Settings
forall (m :: * -> *) a b c d value.
Applicative m =>
(a -> b -> c -> d -> value) -> m a -> m b -> m c -> m d -> m value
map4
    [BrokerAddress]
-> KafkaLogLevel -> Timeout -> BatchNumMessages -> Settings
Settings
    Decoder [BrokerAddress]
Internal.decoderBrokerAddresses
    Decoder KafkaLogLevel
Internal.decoderKafkaLogLevel
    Decoder Timeout
decoderDeliveryTimeout
    Decoder BatchNumMessages
decoderBatchNumMessages

decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout
decoderDeliveryTimeout :: Decoder Timeout
decoderDeliveryTimeout =
  Variable -> Parser Timeout -> Decoder Timeout
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_DELIVERY_TIMEOUT",
        description :: Text
Environment.description = Text
"Delivery timout for producer. Aka 'delivery.timeout.ms'",
        defaultValue :: Text
Environment.defaultValue = Text
"120000"
      }
    ((Int -> Timeout) -> Parser Int -> Parser Timeout
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> Timeout
Kafka.Producer.Timeout Parser Int
forall a. Integral a => Parser a
Environment.int)

decoderBatchNumMessages :: Environment.Decoder BatchNumMessages
decoderBatchNumMessages :: Decoder BatchNumMessages
decoderBatchNumMessages =
  Variable -> Parser BatchNumMessages -> Decoder BatchNumMessages
forall a. Variable -> Parser a -> Decoder a
Environment.variable
    Variable :: Text -> Text -> Text -> Variable
Environment.Variable
      { name :: Text
Environment.name = Text
"KAFKA_BATCH_SIZE",
        description :: Text
Environment.description = Text
"Kafka Producer 'batch.num.messages'",
        defaultValue :: Text
Environment.defaultValue = Text
"10000"
      }
    ((Int -> BatchNumMessages) -> Parser Int -> Parser BatchNumMessages
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> BatchNumMessages
BatchNumMessages Parser Int
forall a. Integral a => Parser a
Environment.int)