Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- data ConsumerProperties = ConsumerProperties {
- cpProps :: Map Text Text
- cpLogLevel :: Maybe KafkaLogLevel
- cpCallbacks :: [KafkaConf -> IO ()]
- cpCallbackPollMode :: CallbackPollMode
- data CallbackPollMode
- brokersList :: [BrokerAddress] -> ConsumerProperties
- autoCommit :: Millis -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- callbackPollMode :: CallbackPollMode -> ConsumerProperties
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
- errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
- logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO ()
- statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO ()
Documentation
data ConsumerProperties Source #
Properties to create KafkaConsumer
.
ConsumerProperties | |
|
Instances
Semigroup ConsumerProperties Source # | |
Defined in Kafka.Consumer.ConsumerProperties (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Consumer.ConsumerProperties |
data CallbackPollMode Source #
Instances
Eq CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties (==) :: CallbackPollMode -> CallbackPollMode -> Bool # (/=) :: CallbackPollMode -> CallbackPollMode -> Bool # | |
Show CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties showsPrec :: Int -> CallbackPollMode -> ShowS # show :: CallbackPollMode -> String # showList :: [CallbackPollMode] -> ShowS # |
noAutoCommit :: ConsumerProperties Source #
Disables auto commit for the consumer
noAutoOffsetStore :: ConsumerProperties Source #
Disables auto offset store for the consumer
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Consumer group id
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Sets the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Sets the compression codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer disconnects logs.
It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.
extraProps :: Map Text Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
Sets debug features for the consumer.
Usually is used with consumerLogLevel
.
callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #
Sets the callback poll mode.
The default CallbackPollModeAsync
mode handles polling rebalance
and keep alive events for you
in a background thread.
With CallbacPollModeSync
the user will poll the consumer
frequently to handle new messages as well as rebalance and keep alive events.
CallbacPollModeSync
lets you can simplify
hw-kafka-client's footprint and have full control over when polling
happens at the cost of having to manage this yourself.
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
for this callback and is served by pollMessage
.
If no partitions had valid offsets to commit this callback will be called
with KafkaError
== KafkaResponseError
RdKafkaRespErrNoOffset
which is not to be considered
an error.
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #
Add a callback for errors.
Examples
Basic usage:
setCallback (errorCallback myErrorCallback) myErrorCallback :: KafkaError -> String -> IO () myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO () Source #
Add a callback for logs.
Examples
Basic usage:
setCallback (logCallback myLogCallback) myLogCallback :: KafkaLogLevel -> String -> String -> IO () myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message