hw-kafka-client-3.1.0: Kafka bindings for Haskell

Safe HaskellNone
LanguageHaskell2010

Kafka.Consumer.ConsumerProperties

Synopsis

Documentation

noAutoCommit :: ConsumerProperties Source #

Disables auto commit for the consumer

noAutoOffsetStore :: ConsumerProperties Source #

Disables auto offset store for the consumer

setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #

logLevel :: KafkaLogLevel -> ConsumerProperties Source #

Sets the logging level. Usually is used with debugOptions to configure which logs are needed.

compression :: KafkaCompressionCodec -> ConsumerProperties Source #

Sets the compression codec for the consumer.

suppressDisconnectLogs :: ConsumerProperties Source #

Suppresses consumer disconnects logs.

It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.

extraProps :: Map Text Text -> ConsumerProperties Source #

Any configuration options that are supported by librdkafka. The full list can be found here

extraProp :: Text -> Text -> ConsumerProperties Source #

Any configuration options that are supported by librdkafka. The full list can be found here

debugOptions :: [KafkaDebug] -> ConsumerProperties Source #

Sets debug features for the consumer. Usually is used with consumerLogLevel.

callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #

Sets the callback poll mode.

The default CallbackPollModeAsync mode handles polling rebalance and keep alive events for you in a background thread.

With CallbacPollModeSync the user will poll the consumer frequently to handle new messages as well as rebalance and keep alive events. CallbacPollModeSync lets you can simplify hw-kafka-client's footprint and have full control over when polling happens at the cost of having to manage this yourself.

rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #

Sets a callback that is called when rebalance is needed.

offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #

Sets a callback that is called when rebalance is needed.

The results of automatic or manual offset commits will be scheduled for this callback and is served by pollMessage.

If no partitions had valid offsets to commit this callback will be called with KafkaError == KafkaResponseError RdKafkaRespErrNoOffset which is not to be considered an error.

errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #

Add a callback for errors.

Examples

Expand

Basic usage:

setCallback (errorCallback myErrorCallback)

myErrorCallback :: KafkaError -> String -> IO ()
myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message

logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO () Source #

Add a callback for logs.

Examples

Expand

Basic usage:

setCallback (logCallback myLogCallback)

myLogCallback :: KafkaLogLevel -> String -> String -> IO ()
myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message

statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO () Source #

Add a callback for stats.

Examples

Expand

Basic usage:

setCallback (statsCallback myStatsCallback)

myStatsCallback :: String -> IO ()
myStatsCallback stats = print $ show stats