module Kafka.Worker.Settings
( Settings (..),
decoder,
MaxMsgsPerSecondPerPartition (..),
MaxMsgsPerPartitionBufferedLocally (..),
MaxPollIntervalMs (..),
SkipOrNot (..),
)
where
import qualified Environment
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Settings.Internal as Internal
import qualified Observability
import qualified Prelude
data Settings = Settings
{
Settings -> [BrokerAddress]
brokerAddresses :: [Consumer.BrokerAddress],
Settings -> Timeout
pollingTimeout :: Consumer.Timeout,
Settings -> MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition :: MaxMsgsPerSecondPerPartition,
Settings -> KafkaLogLevel
logLevel :: Internal.KafkaLogLevel,
Settings -> Settings
observability :: Observability.Settings,
Settings -> MaxMsgsPerPartitionBufferedLocally
maxMsgsPerPartitionBufferedLocally :: MaxMsgsPerPartitionBufferedLocally,
Settings -> BatchSize
pollBatchSize :: Consumer.BatchSize,
Settings -> MaxPollIntervalMs
maxPollIntervalMs :: MaxPollIntervalMs,
Settings -> SkipOrNot
onProcessMessageSkip :: SkipOrNot
}
data SkipOrNot = Skip | DoNotSkip
data MaxMsgsPerSecondPerPartition = ThrottleAt Int | DontThrottle
newtype MaxMsgsPerPartitionBufferedLocally = MaxMsgsPerPartitionBufferedLocally {MaxMsgsPerPartitionBufferedLocally -> Int
unMaxMsgsPerPartitionBufferedLocally :: Int}
newtype MaxPollIntervalMs = MaxPollIntervalMs {MaxPollIntervalMs -> Int
unMaxPollIntervalMs :: Int}
decoder :: Environment.Decoder Settings
decoder :: Decoder Settings
decoder =
([BrokerAddress]
-> Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
([BrokerAddress]
-> Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure [BrokerAddress]
-> Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings
Settings
Decoder
([BrokerAddress]
-> Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> (Decoder
([BrokerAddress]
-> Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings))
-> Decoder
(Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder [BrokerAddress]
-> Decoder
([BrokerAddress]
-> Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder [BrokerAddress]
Internal.decoderBrokerAddresses
Decoder
(Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> (Decoder
(Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings))
-> Decoder
(MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder Timeout
-> Decoder
(Timeout
-> MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder Timeout
decoderPollingTimeout
Decoder
(MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> (Decoder
(MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings))
-> Decoder
(KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder MaxMsgsPerSecondPerPartition
-> Decoder
(MaxMsgsPerSecondPerPartition
-> KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder MaxMsgsPerSecondPerPartition
decoderMaxMessagesPerSecondPerPartition
Decoder
(KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> (Decoder
(KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings))
-> Decoder
(Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder KafkaLogLevel
-> Decoder
(KafkaLogLevel
-> Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder KafkaLogLevel
Internal.decoderKafkaLogLevel
Decoder
(Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> (Decoder
(Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(MaxMsgsPerPartitionBufferedLocally
-> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings))
-> Decoder
(MaxMsgsPerPartitionBufferedLocally
-> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder Settings
-> Decoder
(Settings
-> MaxMsgsPerPartitionBufferedLocally
-> BatchSize
-> MaxPollIntervalMs
-> SkipOrNot
-> Settings)
-> Decoder
(MaxMsgsPerPartitionBufferedLocally
-> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder Settings
Observability.decoder
Decoder
(MaxMsgsPerPartitionBufferedLocally
-> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> (Decoder
(MaxMsgsPerPartitionBufferedLocally
-> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder
(BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings))
-> Decoder
(BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder MaxMsgsPerPartitionBufferedLocally
-> Decoder
(MaxMsgsPerPartitionBufferedLocally
-> BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder
(BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder MaxMsgsPerPartitionBufferedLocally
decoderMaxMsgsPerPartitionBufferedLocally
Decoder (BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> (Decoder
(BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings))
-> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder BatchSize
-> Decoder
(BatchSize -> MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder BatchSize
decoderPollBatchSize
Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
-> (Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder (SkipOrNot -> Settings))
-> Decoder (SkipOrNot -> Settings)
forall a b. a -> (a -> b) -> b
|> Decoder MaxPollIntervalMs
-> Decoder (MaxPollIntervalMs -> SkipOrNot -> Settings)
-> Decoder (SkipOrNot -> Settings)
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder MaxPollIntervalMs
decoderMaxPollIntervalMs
Decoder (SkipOrNot -> Settings)
-> (Decoder (SkipOrNot -> Settings) -> Decoder Settings)
-> Decoder Settings
forall a b. a -> (a -> b) -> b
|> Decoder SkipOrNot
-> Decoder (SkipOrNot -> Settings) -> Decoder Settings
forall (m :: * -> *) a b. Applicative m => m a -> m (a -> b) -> m b
andMap Decoder SkipOrNot
decoderOnProcessMessageFailure
decoderPollingTimeout :: Environment.Decoder Consumer.Timeout
decoderPollingTimeout :: Decoder Timeout
decoderPollingTimeout =
Variable -> Parser Timeout -> Decoder Timeout
forall a. Variable -> Parser a -> Decoder a
Environment.variable
Variable :: Text -> Text -> Text -> Variable
Environment.Variable
{ name :: Text
Environment.name = Text
"KAFKA_POLLING_TIMEOUT",
description :: Text
Environment.description = Text
"Polling timout for consumers",
defaultValue :: Text
Environment.defaultValue = Text
"1000"
}
((Int -> Timeout) -> Parser Int -> Parser Timeout
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> Timeout
Consumer.Timeout Parser Int
forall a. Integral a => Parser a
Environment.int)
decoderMaxMessagesPerSecondPerPartition :: Environment.Decoder MaxMsgsPerSecondPerPartition
decoderMaxMessagesPerSecondPerPartition :: Decoder MaxMsgsPerSecondPerPartition
decoderMaxMessagesPerSecondPerPartition =
Variable
-> Parser MaxMsgsPerSecondPerPartition
-> Decoder MaxMsgsPerSecondPerPartition
forall a. Variable -> Parser a -> Decoder a
Environment.variable
Variable :: Text -> Text -> Text -> Variable
Environment.Variable
{ name :: Text
Environment.name = Text
"KAFKA_MAX_MESSAGES_PER_SECOND_PER_PARTITION",
description :: Text
Environment.description = Text
"This is how we throttle workers. Sets the maximum amount of messages this worker should process per second per partition. 0 is disabled.",
defaultValue :: Text
Environment.defaultValue = Text
"0"
}
( (Int -> MaxMsgsPerSecondPerPartition)
-> Parser Int -> Parser MaxMsgsPerSecondPerPartition
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map
( \Int
maxPerSecond ->
( if Int
maxPerSecond Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then MaxMsgsPerSecondPerPartition
DontThrottle
else Int -> MaxMsgsPerSecondPerPartition
ThrottleAt Int
maxPerSecond
)
)
Parser Int
forall a. Integral a => Parser a
Environment.int
)
decoderMaxPollIntervalMs :: Environment.Decoder MaxPollIntervalMs
decoderMaxPollIntervalMs :: Decoder MaxPollIntervalMs
decoderMaxPollIntervalMs =
Variable -> Parser MaxPollIntervalMs -> Decoder MaxPollIntervalMs
forall a. Variable -> Parser a -> Decoder a
Environment.variable
Variable :: Text -> Text -> Text -> Variable
Environment.Variable
{ name :: Text
Environment.name = Text
"KAFKA_MAX_POLL_INTERVAL_MS",
description :: Text
Environment.description = Text
"This is used to set max.poll.interval.ms",
defaultValue :: Text
Environment.defaultValue = Text
"300000"
}
((Int -> MaxPollIntervalMs)
-> Parser Int -> Parser MaxPollIntervalMs
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> MaxPollIntervalMs
MaxPollIntervalMs Parser Int
forall a. Integral a => Parser a
Environment.int)
decoderMaxMsgsPerPartitionBufferedLocally :: Environment.Decoder MaxMsgsPerPartitionBufferedLocally
decoderMaxMsgsPerPartitionBufferedLocally :: Decoder MaxMsgsPerPartitionBufferedLocally
decoderMaxMsgsPerPartitionBufferedLocally =
Variable
-> Parser MaxMsgsPerPartitionBufferedLocally
-> Decoder MaxMsgsPerPartitionBufferedLocally
forall a. Variable -> Parser a -> Decoder a
Environment.variable
Variable :: Text -> Text -> Text -> Variable
Environment.Variable
{ name :: Text
Environment.name = Text
"KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY",
description :: Text
Environment.description = Text
"Pausing reading from kafka when we have this many messages queued up but not yet processed",
defaultValue :: Text
Environment.defaultValue = Text
"100"
}
((Int -> MaxMsgsPerPartitionBufferedLocally)
-> Parser Int -> Parser MaxMsgsPerPartitionBufferedLocally
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> MaxMsgsPerPartitionBufferedLocally
MaxMsgsPerPartitionBufferedLocally Parser Int
forall a. Integral a => Parser a
Environment.int)
decoderPollBatchSize :: Environment.Decoder Consumer.BatchSize
decoderPollBatchSize :: Decoder BatchSize
decoderPollBatchSize =
Variable -> Parser BatchSize -> Decoder BatchSize
forall a. Variable -> Parser a -> Decoder a
Environment.variable
Variable :: Text -> Text -> Text -> Variable
Environment.Variable
{ name :: Text
Environment.name = Text
"KAFKA_POLL_BATCH_SIZE",
description :: Text
Environment.description = Text
"The amount of messages we request in a single poll request to Kafka",
defaultValue :: Text
Environment.defaultValue = Text
"100"
}
((Int -> BatchSize) -> Parser Int -> Parser BatchSize
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> BatchSize
Consumer.BatchSize Parser Int
forall a. Integral a => Parser a
Environment.int)
decoderOnProcessMessageFailure :: Environment.Decoder SkipOrNot
decoderOnProcessMessageFailure :: Decoder SkipOrNot
decoderOnProcessMessageFailure =
Variable -> Parser SkipOrNot -> Decoder SkipOrNot
forall a. Variable -> Parser a -> Decoder a
Environment.variable
Variable :: Text -> Text -> Text -> Variable
Environment.Variable
{ name :: Text
Environment.name = Text
"KAFKA_SKIP_ON_PROCESS_MESSAGE_FAILURE",
description :: Text
Environment.description = Text
"Whether to skip message that are failing processing. 1 means on, 0 means off.",
defaultValue :: Text
Environment.defaultValue = Text
"0"
}
( Parser Integer
-> (Integer -> Result Text SkipOrNot) -> Parser SkipOrNot
forall a b. Parser a -> (a -> Result Text b) -> Parser b
Environment.custom
Parser Integer
forall a. Integral a => Parser a
Environment.int
( \Integer
int ->
if Integer
int Integer -> Integer -> Bool
forall comparable.
Ord comparable =>
comparable -> comparable -> Bool
>= Integer
1
then SkipOrNot -> Result Text SkipOrNot
forall error value. value -> Result error value
Ok SkipOrNot
Skip
else SkipOrNot -> Result Text SkipOrNot
forall error value. value -> Result error value
Ok SkipOrNot
DoNotSkip
)
)