| Safe Haskell | None | 
|---|---|
| Language | Haskell2010 | 
Kafka.Consumer.ConsumerProperties
Description
Module with consumer properties types and functions.
Synopsis
- data ConsumerProperties = ConsumerProperties {}
- data CallbackPollMode
- brokersList :: [BrokerAddress] -> ConsumerProperties
- autoCommit :: Millis -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: Callback -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- statisticsInterval :: Millis -> ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- callbackPollMode :: CallbackPollMode -> ConsumerProperties
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback
- errorCallback :: (KafkaError -> String -> IO ()) -> Callback
- logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
- statsCallback :: (ByteString -> IO ()) -> Callback
- data Callback
Documentation
data ConsumerProperties Source #
Properties to create KafkaConsumer.
Constructors
| ConsumerProperties | |
| Fields | |
Instances
| Semigroup ConsumerProperties Source # | |
| Defined in Kafka.Consumer.ConsumerProperties Methods (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
| Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. | 
| Defined in Kafka.Consumer.ConsumerProperties Methods mempty :: ConsumerProperties # mappend :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # mconcat :: [ConsumerProperties] -> ConsumerProperties # | |
data CallbackPollMode Source #
Whether the callback polling should be done synchronously or not.
Constructors
| CallbackPollModeSync | You have to poll the consumer frequently to handle new messages as well as rebalance and keep alive events. This enables lowering the footprint and having full control over when polling happens, at the cost of manually managing those events. | 
| CallbackPollModeAsync | Handle polling rebalance and keep alive events for you in a background thread. | 
Instances
| Eq CallbackPollMode Source # | |
| Defined in Kafka.Consumer.ConsumerProperties Methods (==) :: CallbackPollMode -> CallbackPollMode -> Bool # (/=) :: CallbackPollMode -> CallbackPollMode -> Bool # | |
| Show CallbackPollMode Source # | |
| Defined in Kafka.Consumer.ConsumerProperties Methods showsPrec :: Int -> CallbackPollMode -> ShowS # show :: CallbackPollMode -> String # showList :: [CallbackPollMode] -> ShowS # | |
brokersList :: [BrokerAddress] -> ConsumerProperties Source #
Set the list of brokers to contact to connect to the Kafka cluster.
autoCommit :: Millis -> ConsumerProperties Source #
Set the auto commit interval and enables auto commit.
noAutoCommit :: ConsumerProperties Source #
Disable auto commit for the consumer.
noAutoOffsetStore :: ConsumerProperties Source #
Disable auto offset store for the consumer.
See enable.auto.offset.store for more information.
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Set the consumer group id.
clientId :: ClientId -> ConsumerProperties Source #
Set the consumer identifier.
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Set the logging level.
 Usually is used with debugOptions to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Set the compression.codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer log.connection.close.
It might be useful to turn this off when interacting with brokers
 with an aggressive connection.max.idle.ms value.
statisticsInterval :: Millis -> ConsumerProperties Source #
Set the statistics.interval.ms for the producer.
extraProps :: Map Text Text -> ConsumerProperties Source #
Set any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Set any configuration option that is supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #
Set the callback poll mode. Default value is CallbackPollModeAsync.
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback Source #
Sets a callback that is called when rebalance is needed.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
 for this callback and is served by pollMessage.
If no partitions had valid offsets to commit this callback will be called
 with KafkaResponseError RdKafkaRespErrNoOffset which is not to be considered
 an error.
errorCallback :: (KafkaError -> String -> IO ()) -> Callback Source #
Add a callback for errors.
Examples
Basic usage:
'setCallback' ('errorCallback' myErrorCallback)
myErrorCallback :: 'KafkaError' -> String -> IO ()
myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> messagelogCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback Source #
Add a callback for logs.
Examples
Basic usage:
'setCallback' ('logCallback' myLogCallback)
myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> messagestatsCallback :: (ByteString -> IO ()) -> Callback Source #
Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md.
Examples
Basic usage:
'setCallback' ('statsCallback' myStatsCallback)
myStatsCallback :: String -> IO ()
myStatsCallback stats = print $ show statsCallbacks allow retrieving various information like error occurences, statistics
 and log messages.
 See setCallback (Consumer) and setCallback (Producer) for more details.