Safe Haskell | None |
---|---|
Language | Haskell2010 |
Module with consumer properties types and functions.
Synopsis
- data ConsumerProperties = ConsumerProperties {
- cpProps :: Map Text Text
- cpLogLevel :: Maybe KafkaLogLevel
- cpCallbacks :: [KafkaConf -> IO ()]
- cpCallbackPollMode :: CallbackPollMode
- data CallbackPollMode
- brokersList :: [BrokerAddress] -> ConsumerProperties
- autoCommit :: Millis -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- callbackPollMode :: CallbackPollMode -> ConsumerProperties
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
- errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
- logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO ()
- statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO ()
Documentation
data ConsumerProperties Source #
Properties to create KafkaConsumer
.
ConsumerProperties | |
|
Instances
Semigroup ConsumerProperties Source # | |
Defined in Kafka.Consumer.ConsumerProperties (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Consumer.ConsumerProperties |
data CallbackPollMode Source #
Whether the callback polling should be done synchronously or not.
CallbackPollModeSync | You have to poll the consumer frequently to handle new messages as well as rebalance and keep alive events. This enables lowering the footprint and having full control over when polling happens, at the cost of manually managing those events. |
CallbackPollModeAsync | Handle polling rebalance and keep alive events for you in a background thread. |
Instances
Eq CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties (==) :: CallbackPollMode -> CallbackPollMode -> Bool # (/=) :: CallbackPollMode -> CallbackPollMode -> Bool # | |
Show CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties showsPrec :: Int -> CallbackPollMode -> ShowS # show :: CallbackPollMode -> String # showList :: [CallbackPollMode] -> ShowS # |
brokersList :: [BrokerAddress] -> ConsumerProperties Source #
Set the list of brokers to contact to connect to the Kafka cluster.
autoCommit :: Millis -> ConsumerProperties Source #
Set the auto commit interval and enables auto commit.
noAutoCommit :: ConsumerProperties Source #
Disable auto commit for the consumer.
noAutoOffsetStore :: ConsumerProperties Source #
Disable auto offset store for the consumer.
See enable.auto.offset.store for more information.
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Set the consumer group id.
clientId :: ClientId -> ConsumerProperties Source #
Set the consumer identifier.
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Set the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Set the compression.codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer log.connection.close.
It might be useful to turn this off when interacting with brokers
with an aggressive connection.max.idle.ms
value.
extraProps :: Map Text Text -> ConsumerProperties Source #
Set any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Set any configuration option that is supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #
Set the callback poll mode. Default value is CallbackPollModeAsync
.
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
for this callback and is served by pollMessage
.
If no partitions had valid offsets to commit this callback will be called
with KafkaResponseError
RdKafkaRespErrNoOffset
which is not to be considered
an error.
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #
Add a callback for errors.
Examples
Basic usage:
'setCallback' ('errorCallback' myErrorCallback) myErrorCallback :: 'KafkaError' -> String -> IO () myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO () Source #
Add a callback for logs.
Examples
Basic usage:
'setCallback' ('logCallback' myLogCallback) myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO () myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message