Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- data ConsumerProperties = ConsumerProperties {
- cpProps :: Map Text Text
- cpLogLevel :: Maybe KafkaLogLevel
- cpCallbacks :: [KafkaConf -> IO ()]
- brokersList :: [BrokerAddress] -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
- errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
- logCallback :: HasKafkaConf k => (Int -> String -> String -> IO ()) -> k -> IO ()
- statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO ()
Documentation
data ConsumerProperties Source #
Properties to create KafkaConsumer
.
ConsumerProperties | |
|
Instances
Semigroup ConsumerProperties Source # | |
Defined in Kafka.Consumer.ConsumerProperties (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Consumer.ConsumerProperties |
noAutoCommit :: ConsumerProperties Source #
Disables auto commit for the consumer
noAutoOffsetStore :: ConsumerProperties Source #
Disables auto offset store for the consumer
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Consumer group id
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Sets the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Sets the compression codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer disconnects logs.
It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.
extraProps :: Map Text Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
Sets debug features for the consumer.
Usually is used with consumerLogLevel
.
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
Callback implementations suppose to watch for KafkaResponseError
RdKafkaRespErrAssignPartitions
and
for KafkaResponseError
RdKafkaRespErrRevokePartitions
. Other error codes are not expected and would indicate
something really bad happening in a system, or bugs in librdkafka
itself.
A callback is expected to call assign
according to the error code it receives.
- When
RdKafkaRespErrAssignPartitions
happensassign
should be called with all the partitions it was called with. It is OK to alter partitions offsets before callingassign
. - When
RdKafkaRespErrRevokePartitions
happensassign
should be called with an empty list of partitions. rebalanceCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
for this callback and is served by pollMessage
.
A callback is expected to call assign
according to the error code it receives.
If no partitions had valid offsets to commit this callback will be called
with KafkaError
== KafkaResponseError
RdKafkaRespErrNoOffset
which is not to be considered
an error.
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #