hw-kafka-client-4.0.3: Kafka bindings for Haskell
Safe HaskellNone
LanguageHaskell2010

Kafka.Consumer.ConsumerProperties

Description

Module with consumer properties types and functions.

Synopsis

Documentation

data CallbackPollMode Source #

Whether the callback polling should be done synchronously or not.

Constructors

CallbackPollModeSync

You have to poll the consumer frequently to handle new messages as well as rebalance and keep alive events. This enables lowering the footprint and having full control over when polling happens, at the cost of manually managing those events.

CallbackPollModeAsync

Handle polling rebalance and keep alive events for you in a background thread.

brokersList :: [BrokerAddress] -> ConsumerProperties Source #

Set the list of brokers to contact to connect to the Kafka cluster.

noAutoCommit :: ConsumerProperties Source #

Disable auto commit for the consumer.

noAutoOffsetStore :: ConsumerProperties Source #

Disable auto offset store for the consumer.

See enable.auto.offset.store for more information.

setCallback :: Callback -> ConsumerProperties Source #

Set the consumer callback.

For examples of use, see:

logLevel :: KafkaLogLevel -> ConsumerProperties Source #

Set the logging level. Usually is used with debugOptions to configure which logs are needed.

suppressDisconnectLogs :: ConsumerProperties Source #

Suppresses consumer log.connection.close.

It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.

extraProps :: Map Text Text -> ConsumerProperties Source #

Set any configuration options that are supported by librdkafka. The full list can be found here

extraProp :: Text -> Text -> ConsumerProperties Source #

Set any configuration option that is supported by librdkafka. The full list can be found here

debugOptions :: [KafkaDebug] -> ConsumerProperties Source #

Set debug features for the consumer. Usually is used with logLevel.

callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #

Set the callback poll mode. Default value is CallbackPollModeAsync.

rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback Source #

Sets a callback that is called when rebalance is needed.

offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback Source #

Sets a callback that is called when rebalance is needed.

The results of automatic or manual offset commits will be scheduled for this callback and is served by pollMessage.

If no partitions had valid offsets to commit this callback will be called with KafkaResponseError RdKafkaRespErrNoOffset which is not to be considered an error.

errorCallback :: (KafkaError -> String -> IO ()) -> Callback Source #

Add a callback for errors.

Examples

Expand

Basic usage:

'setCallback' ('errorCallback' myErrorCallback)

myErrorCallback :: 'KafkaError' -> String -> IO ()
myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message

logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback Source #

Add a callback for logs.

Examples

Expand

Basic usage:

'setCallback' ('logCallback' myLogCallback)

myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message

statsCallback :: (ByteString -> IO ()) -> Callback Source #

Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md.

Examples

Expand

Basic usage:

'setCallback' ('statsCallback' myStatsCallback)

myStatsCallback :: String -> IO ()
myStatsCallback stats = print $ show stats

data Callback Source #

Callbacks allow retrieving various information like error occurences, statistics and log messages. See setCallback (Consumer) and setCallback (Producer) for more details.