hw-kafka-client-4.0.3: Kafka bindings for Haskell
Safe HaskellNone
LanguageHaskell2010

Kafka.Consumer

Description

Module to consume messages from Kafka topics.

Here's an example of code to consume messages from a topic:

import Control.Exception (bracket)
import Control.Monad (replicateM_)
import Kafka.Consumer

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList ["localhost:9092"]
             <> groupId (ConsumerGroupId "consumer_example_group")
             <> noAutoCommit
             <> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
           <> offsetReset Earliest

-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
    res <- bracket mkConsumer clConsumer runHandler
    print res
    where
      mkConsumer = newConsumer consumerProps consumerSub
      clConsumer (Left err) = pure (Left err)
      clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
      runHandler (Left err) = pure (Left err)
      runHandler (Right kc) = processMessages kc

-- Example polling 10 times before stopping
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
    replicateM_ 10 $ do
      msg <- pollMessage kafka (Timeout 1000)
      putStrLn $ "Message: " <> show msg
      err <- commitAllOffsets OffsetCommit kafka
      putStrLn $ "Offsets: " <> maybe "Committed." show err
    pure $ Right ()
Synopsis

Documentation

data KafkaConsumer Source #

The main type for Kafka consumption, used e.g. to poll and commit messages.

Its constructor is intentionally not exposed, instead, one should use newConsumer to acquire such a value.

data KafkaCompressionCodec Source #

Compression codec used by a topic

See Kafka documentation on compression codecs

Constructors

NoCompression 
Gzip 
Snappy 
Lz4 

Instances

Instances details
Eq KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Show KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Generic KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaCompressionCodec :: Type -> Type #

type Rep KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaCompressionCodec = D1 ('MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'False) ((C1 ('MetaCons "NoCompression" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Gzip" 'PrefixI 'False) (U1 :: Type -> Type)) :+: (C1 ('MetaCons "Snappy" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Lz4" 'PrefixI 'False) (U1 :: Type -> Type)))

data KafkaDebug Source #

Available librdkafka debug contexts

Instances

Instances details
Eq KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Show KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Generic KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaDebug :: Type -> Type #

type Rep KafkaDebug Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaDebug = D1 ('MetaData "KafkaDebug" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'False) (((C1 ('MetaCons "DebugGeneric" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugBroker" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugTopic" 'PrefixI 'False) (U1 :: Type -> Type))) :+: (C1 ('MetaCons "DebugMetadata" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugQueue" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugMsg" 'PrefixI 'False) (U1 :: Type -> Type)))) :+: ((C1 ('MetaCons "DebugProtocol" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugCgrp" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugSecurity" 'PrefixI 'False) (U1 :: Type -> Type))) :+: (C1 ('MetaCons "DebugFetch" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugFeature" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugAll" 'PrefixI 'False) (U1 :: Type -> Type)))))

data KafkaError Source #

All possible Kafka errors

Instances

Instances details
Eq KafkaError Source # 
Instance details

Defined in Kafka.Types

Show KafkaError Source # 
Instance details

Defined in Kafka.Types

Generic KafkaError Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaError :: Type -> Type #

Exception KafkaError Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaError Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaError = D1 ('MetaData "KafkaError" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'False) ((C1 ('MetaCons "KafkaError" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: (C1 ('MetaCons "KafkaInvalidReturnValue" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "KafkaBadSpecification" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))) :+: ((C1 ('MetaCons "KafkaResponseError" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 RdKafkaRespErrT)) :+: C1 ('MetaCons "KafkaInvalidConfigurationValue" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text))) :+: (C1 ('MetaCons "KafkaUnknownConfigurationKey" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "KafkaBadConfiguration" 'PrefixI 'False) (U1 :: Type -> Type))))

newtype Timeout Source #

Timeout in milliseconds

Constructors

Timeout 

Fields

Instances

Instances details
Eq Timeout Source # 
Instance details

Defined in Kafka.Types

Methods

(==) :: Timeout -> Timeout -> Bool #

(/=) :: Timeout -> Timeout -> Bool #

Read Timeout Source # 
Instance details

Defined in Kafka.Types

Show Timeout Source # 
Instance details

Defined in Kafka.Types

Generic Timeout Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Timeout :: Type -> Type #

Methods

from :: Timeout -> Rep Timeout x #

to :: Rep Timeout x -> Timeout #

type Rep Timeout Source # 
Instance details

Defined in Kafka.Types

type Rep Timeout = D1 ('MetaData "Timeout" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "Timeout" 'PrefixI 'True) (S1 ('MetaSel ('Just "unTimeout") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

newtype BrokerAddress Source #

Kafka broker address string (e.g. broker1:9092)

Constructors

BrokerAddress 

Instances

Instances details
Eq BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Show BrokerAddress Source # 
Instance details

Defined in Kafka.Types

IsString BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Generic BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerAddress :: Type -> Type #

type Rep BrokerAddress Source # 
Instance details

Defined in Kafka.Types

type Rep BrokerAddress = D1 ('MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "BrokerAddress" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBrokerAddress") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

newtype TopicName Source #

Topic name to consume/produce messages

Wildcard (regex) topics are supported by the librdkafka assignor: any topic name in the topics list that is prefixed with ^ will be regex-matched to the full list of topics in the cluster and matching topics will be added to the subscription list.

Constructors

TopicName 

Fields

Instances

Instances details
Eq TopicName Source # 
Instance details

Defined in Kafka.Types

Ord TopicName Source # 
Instance details

Defined in Kafka.Types

Read TopicName Source # 
Instance details

Defined in Kafka.Types

Show TopicName Source # 
Instance details

Defined in Kafka.Types

IsString TopicName Source # 
Instance details

Defined in Kafka.Types

Generic TopicName Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicName :: Type -> Type #

type Rep TopicName Source # 
Instance details

Defined in Kafka.Types

type Rep TopicName = D1 ('MetaData "TopicName" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "TopicName" 'PrefixI 'True) (S1 ('MetaSel ('Just "unTopicName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

data TopicType Source #

Whether the topic is created by a user or by the system

Constructors

User

Normal topics that are created by user.

System

Topics starting with a double underscore "__" (__consumer_offsets, __confluent.support.metrics, etc.) are considered "system" topics

Instances

Instances details
Eq TopicType Source # 
Instance details

Defined in Kafka.Types

Ord TopicType Source # 
Instance details

Defined in Kafka.Types

Read TopicType Source # 
Instance details

Defined in Kafka.Types

Show TopicType Source # 
Instance details

Defined in Kafka.Types

Generic TopicType Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicType :: Type -> Type #

type Rep TopicType Source # 
Instance details

Defined in Kafka.Types

type Rep TopicType = D1 ('MetaData "TopicType" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "User" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "System" 'PrefixI 'False) (U1 :: Type -> Type))

newtype BatchSize Source #

Batch size used for polling

Constructors

BatchSize 

Fields

Instances

Instances details
Eq BatchSize Source # 
Instance details

Defined in Kafka.Types

Num BatchSize Source # 
Instance details

Defined in Kafka.Types

Ord BatchSize Source # 
Instance details

Defined in Kafka.Types

Read BatchSize Source # 
Instance details

Defined in Kafka.Types

Show BatchSize Source # 
Instance details

Defined in Kafka.Types

Generic BatchSize Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BatchSize :: Type -> Type #

type Rep BatchSize Source # 
Instance details

Defined in Kafka.Types

type Rep BatchSize = D1 ('MetaData "BatchSize" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "BatchSize" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBatchSize") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

newtype ClientId Source #

Client ID used by Kafka to better track requests

See Kafka documentation on client ID

Constructors

ClientId 

Fields

Instances

Instances details
Eq ClientId Source # 
Instance details

Defined in Kafka.Types

Ord ClientId Source # 
Instance details

Defined in Kafka.Types

Show ClientId Source # 
Instance details

Defined in Kafka.Types

IsString ClientId Source # 
Instance details

Defined in Kafka.Types

Generic ClientId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep ClientId :: Type -> Type #

Methods

from :: ClientId -> Rep ClientId x #

to :: Rep ClientId x -> ClientId #

type Rep ClientId Source # 
Instance details

Defined in Kafka.Types

type Rep ClientId = D1 ('MetaData "ClientId" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "ClientId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unClientId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

newtype Millis Source #

A number of milliseconds, used to represent durations and timestamps

Constructors

Millis 

Fields

Instances

Instances details
Eq Millis Source # 
Instance details

Defined in Kafka.Types

Methods

(==) :: Millis -> Millis -> Bool #

(/=) :: Millis -> Millis -> Bool #

Num Millis Source # 
Instance details

Defined in Kafka.Types

Ord Millis Source # 
Instance details

Defined in Kafka.Types

Read Millis Source # 
Instance details

Defined in Kafka.Types

Show Millis Source # 
Instance details

Defined in Kafka.Types

Generic Millis Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Millis :: Type -> Type #

Methods

from :: Millis -> Rep Millis x #

to :: Rep Millis x -> Millis #

type Rep Millis Source # 
Instance details

Defined in Kafka.Types

type Rep Millis = D1 ('MetaData "Millis" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "Millis" 'PrefixI 'True) (S1 ('MetaSel ('Just "unMillis") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)))

newtype PartitionId Source #

Topic partition ID

Constructors

PartitionId 

Fields

Instances

Instances details
Enum PartitionId Source # 
Instance details

Defined in Kafka.Types

Eq PartitionId Source # 
Instance details

Defined in Kafka.Types

Ord PartitionId Source # 
Instance details

Defined in Kafka.Types

Read PartitionId Source # 
Instance details

Defined in Kafka.Types

Show PartitionId Source # 
Instance details

Defined in Kafka.Types

Generic PartitionId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep PartitionId :: Type -> Type #

type Rep PartitionId Source # 
Instance details

Defined in Kafka.Types

type Rep PartitionId = D1 ('MetaData "PartitionId" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "PartitionId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unPartitionId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

newtype BrokerId Source #

Kafka broker ID

Constructors

BrokerId 

Fields

Instances

Instances details
Eq BrokerId Source # 
Instance details

Defined in Kafka.Types

Ord BrokerId Source # 
Instance details

Defined in Kafka.Types

Read BrokerId Source # 
Instance details

Defined in Kafka.Types

Show BrokerId Source # 
Instance details

Defined in Kafka.Types

Generic BrokerId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerId :: Type -> Type #

Methods

from :: BrokerId -> Rep BrokerId x #

to :: Rep BrokerId x -> BrokerId #

type Rep BrokerId Source # 
Instance details

Defined in Kafka.Types

type Rep BrokerId = D1 ('MetaData "BrokerId" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "BrokerId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBrokerId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

topicType :: TopicName -> TopicType Source #

Deduce the type of a topic from its name, by checking if it starts with a double underscore "__"

kafkaDebugToText :: KafkaDebug -> Text Source #

Convert a KafkaDebug into its librdkafka string equivalent.

This is used internally by the library but may be useful to some developers.

kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text Source #

Convert a KafkaCompressionCodec into its librdkafka string equivalent.

This is used internally by the library but may be useful to some developers.

data Callback Source #

Callbacks allow retrieving various information like error occurences, statistics and log messages. See setCallback (Consumer) and setCallback (Producer) for more details.

data ConsumerRecord k v Source #

Represents a received message from Kafka (i.e. used in a consumer)

Constructors

ConsumerRecord 

Fields

Instances

Instances details
Bitraversable ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bitraverse :: Applicative f => (a -> f c) -> (b -> f d) -> ConsumerRecord a b -> f (ConsumerRecord c d) #

Bifoldable ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bifold :: Monoid m => ConsumerRecord m m -> m #

bifoldMap :: Monoid m => (a -> m) -> (b -> m) -> ConsumerRecord a b -> m #

bifoldr :: (a -> c -> c) -> (b -> c -> c) -> c -> ConsumerRecord a b -> c #

bifoldl :: (c -> a -> c) -> (c -> b -> c) -> c -> ConsumerRecord a b -> c #

Bifunctor ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bimap :: (a -> b) -> (c -> d) -> ConsumerRecord a c -> ConsumerRecord b d #

first :: (a -> b) -> ConsumerRecord a c -> ConsumerRecord b c #

second :: (b -> c) -> ConsumerRecord a b -> ConsumerRecord a c #

Functor (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

fmap :: (a -> b) -> ConsumerRecord k a -> ConsumerRecord k b #

(<$) :: a -> ConsumerRecord k b -> ConsumerRecord k a #

Foldable (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

fold :: Monoid m => ConsumerRecord k m -> m #

foldMap :: Monoid m => (a -> m) -> ConsumerRecord k a -> m #

foldMap' :: Monoid m => (a -> m) -> ConsumerRecord k a -> m #

foldr :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldr' :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldl :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldl' :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldr1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

foldl1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

toList :: ConsumerRecord k a -> [a] #

null :: ConsumerRecord k a -> Bool #

length :: ConsumerRecord k a -> Int #

elem :: Eq a => a -> ConsumerRecord k a -> Bool #

maximum :: Ord a => ConsumerRecord k a -> a #

minimum :: Ord a => ConsumerRecord k a -> a #

sum :: Num a => ConsumerRecord k a -> a #

product :: Num a => ConsumerRecord k a -> a #

Traversable (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

traverse :: Applicative f => (a -> f b) -> ConsumerRecord k a -> f (ConsumerRecord k b) #

sequenceA :: Applicative f => ConsumerRecord k (f a) -> f (ConsumerRecord k a) #

mapM :: Monad m => (a -> m b) -> ConsumerRecord k a -> m (ConsumerRecord k b) #

sequence :: Monad m => ConsumerRecord k (m a) -> m (ConsumerRecord k a) #

(Eq k, Eq v) => Eq (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

(Read k, Read v) => Read (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

(Show k, Show v) => Show (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep (ConsumerRecord k v) :: Type -> Type #

Methods

from :: ConsumerRecord k v -> Rep (ConsumerRecord k v) x #

to :: Rep (ConsumerRecord k v) x -> ConsumerRecord k v #

type Rep (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep (ConsumerRecord k v) = D1 ('MetaData "ConsumerRecord" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "ConsumerRecord" 'PrefixI 'True) ((S1 ('MetaSel ('Just "crTopic") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 TopicName) :*: (S1 ('MetaSel ('Just "crPartition") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 PartitionId) :*: S1 ('MetaSel ('Just "crOffset") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Offset))) :*: (S1 ('MetaSel ('Just "crTimestamp") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Timestamp) :*: (S1 ('MetaSel ('Just "crKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 k) :*: S1 ('MetaSel ('Just "crValue") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 v)))))

data TopicPartition Source #

Kafka topic partition structure

Instances

Instances details
Eq TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

Show TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep TopicPartition :: Type -> Type #

type Rep TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep TopicPartition = D1 ('MetaData "TopicPartition" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "TopicPartition" 'PrefixI 'True) (S1 ('MetaSel ('Just "tpTopicName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 TopicName) :*: (S1 ('MetaSel ('Just "tpPartition") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 PartitionId) :*: S1 ('MetaSel ('Just "tpOffset") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 PartitionOffset))))

data OffsetStoreMethod Source #

Indicates the method of storing the offsets

Constructors

OffsetStoreBroker

Offsets are stored in Kafka broker (preferred)

OffsetStoreFile FilePath OffsetStoreSync

Offsets are stored in a file (and synced to disk according to the sync policy)

Instances

Instances details
Eq OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

Show OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetStoreMethod :: Type -> Type #

type Rep OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreMethod = D1 ('MetaData "OffsetStoreMethod" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "OffsetStoreBroker" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetStoreFile" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 FilePath) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 OffsetStoreSync)))

data OffsetStoreSync Source #

Indicates how offsets are to be synced to disk

Constructors

OffsetSyncDisable

Do not sync offsets (in Kafka: -1)

OffsetSyncImmediate

Sync immediately after each offset commit (in Kafka: 0)

OffsetSyncInterval Int

Sync after specified interval in millis

Instances

Instances details
Eq OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Show OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetStoreSync :: Type -> Type #

type Rep OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreSync = D1 ('MetaData "OffsetStoreSync" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "OffsetSyncDisable" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "OffsetSyncImmediate" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetSyncInterval" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int))))

data OffsetCommit Source #

Offsets commit mode

Constructors

OffsetCommit

Forces consumer to block until the broker offsets commit is done

OffsetCommitAsync

Offsets will be committed in a non-blocking way

Instances

Instances details
Eq OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Show OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetCommit :: Type -> Type #

type Rep OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetCommit = D1 ('MetaData "OffsetCommit" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "OffsetCommit" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetCommitAsync" 'PrefixI 'False) (U1 :: Type -> Type))

data Timestamp Source #

Consumer record timestamp

Instances

Instances details
Eq Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Read Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Show Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep Timestamp :: Type -> Type #

type Rep Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Timestamp = D1 ('MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "CreateTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Millis)) :+: (C1 ('MetaCons "LogAppendTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Millis)) :+: C1 ('MetaCons "NoTimestamp" 'PrefixI 'False) (U1 :: Type -> Type)))

data SubscribedPartitions Source #

Partitions subscribed by a consumer

Constructors

SubscribedPartitions [PartitionId]

Subscribe only to those partitions

SubscribedPartitionsAll

Subscribe to all partitions

Instances

Instances details
Eq SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

Show SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep SubscribedPartitions :: Type -> Type #

type Rep SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep SubscribedPartitions = D1 ('MetaData "SubscribedPartitions" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "SubscribedPartitions" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [PartitionId])) :+: C1 ('MetaCons "SubscribedPartitionsAll" 'PrefixI 'False) (U1 :: Type -> Type))

data PartitionOffset Source #

The partition offset

Instances

Instances details
Eq PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Show PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep PartitionOffset :: Type -> Type #

type Rep PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep PartitionOffset = D1 ('MetaData "PartitionOffset" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) ((C1 ('MetaCons "PartitionOffsetBeginning" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "PartitionOffsetEnd" 'PrefixI 'False) (U1 :: Type -> Type)) :+: (C1 ('MetaCons "PartitionOffset" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :+: (C1 ('MetaCons "PartitionOffsetStored" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "PartitionOffsetInvalid" 'PrefixI 'False) (U1 :: Type -> Type))))

data RebalanceEvent Source #

A set of events which happen during the rebalancing process

Constructors

RebalanceBeforeAssign [(TopicName, PartitionId)]

Happens before Kafka Client confirms new assignment

RebalanceAssign [(TopicName, PartitionId)]

Happens after the new assignment is confirmed

RebalanceBeforeRevoke [(TopicName, PartitionId)]

Happens before Kafka Client confirms partitions rejection

RebalanceRevoke [(TopicName, PartitionId)]

Happens after the rejection is confirmed

Instances

Instances details
Eq RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Show RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep RebalanceEvent :: Type -> Type #

type Rep RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

data OffsetReset Source #

Where to reset the offset when there is no initial offset in Kafka

See Kafka documentation on offset reset

Constructors

Earliest 
Latest 

Instances

Instances details
Eq OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Show OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetReset :: Type -> Type #

type Rep OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetReset = D1 ('MetaData "OffsetReset" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'False) (C1 ('MetaCons "Earliest" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Latest" 'PrefixI 'False) (U1 :: Type -> Type))

newtype Offset Source #

A message offset in a partition

Constructors

Offset 

Fields

Instances

Instances details
Eq Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

(==) :: Offset -> Offset -> Bool #

(/=) :: Offset -> Offset -> Bool #

Ord Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Read Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Show Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep Offset :: Type -> Type #

Methods

from :: Offset -> Rep Offset x #

to :: Rep Offset x -> Offset #

type Rep Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Offset = D1 ('MetaData "Offset" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "Offset" 'PrefixI 'True) (S1 ('MetaSel ('Just "unOffset") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)))

newtype ConsumerGroupId Source #

Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic.

See Kafka documentation on consumer group

Constructors

ConsumerGroupId 

Instances

Instances details
Eq ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Ord ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Show ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

IsString ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep ConsumerGroupId :: Type -> Type #

type Rep ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep ConsumerGroupId = D1 ('MetaData "ConsumerGroupId" "Kafka.Consumer.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "ConsumerGroupId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unConsumerGroupId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v Source #

Deprecated: Isn't concern of this library. Use first

crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' Source #

Deprecated: Isn't concern of this library. Use second

crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' Source #

Deprecated: Isn't concern of this library. Use bimap

sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) Source #

Deprecated: Isn't concern of this library. Use bitraverse id pure

traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) Source #

Deprecated: Isn't concern of this library. Use bitraverse f pure

traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) Source #

Deprecated: Isn't concern of this library. Use bitraverse id pure <$> bitraverse f pure r

traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) Source #

Deprecated: Isn't concern of this library. Use sequenceA <$> traverse f r

bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) Source #

Deprecated: Isn't concern of this library. Use bisequenceA <$> bimapM f g r

data Subscription Source #

A consumer subscription to a topic.

Examples

Expand

Typically you don't call the constructor directly, but combine settings:

consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
           <> offsetReset Earliest
           <> extraSubscriptionProps (fromList [("prop1", "value 1"), ("prop2", "value 2")])

topics :: [TopicName] -> Subscription Source #

Build a subscription by giving the list of topic names only

offsetReset :: OffsetReset -> Subscription Source #

Build a subscription by giving the offset reset parameter only

extraSubscriptionProps :: Map Text Text -> Subscription Source #

Build a subscription by giving extra properties only

errorCallback :: (KafkaError -> String -> IO ()) -> Callback Source #

Add a callback for errors.

Examples

Expand

Basic usage:

'setCallback' ('errorCallback' myErrorCallback)

myErrorCallback :: 'KafkaError' -> String -> IO ()
myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message

logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback Source #

Add a callback for logs.

Examples

Expand

Basic usage:

'setCallback' ('logCallback' myLogCallback)

myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message

statsCallback :: (ByteString -> IO ()) -> Callback Source #

Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md.

Examples

Expand

Basic usage:

'setCallback' ('statsCallback' myStatsCallback)

myStatsCallback :: String -> IO ()
myStatsCallback stats = print $ show stats

rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback Source #

Sets a callback that is called when rebalance is needed.

offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback Source #

Sets a callback that is called when rebalance is needed.

The results of automatic or manual offset commits will be scheduled for this callback and is served by pollMessage.

If no partitions had valid offsets to commit this callback will be called with KafkaResponseError RdKafkaRespErrNoOffset which is not to be considered an error.

data CallbackPollMode Source #

Whether the callback polling should be done synchronously or not.

Constructors

CallbackPollModeSync

You have to poll the consumer frequently to handle new messages as well as rebalance and keep alive events. This enables lowering the footprint and having full control over when polling happens, at the cost of manually managing those events.

CallbackPollModeAsync

Handle polling rebalance and keep alive events for you in a background thread.

brokersList :: [BrokerAddress] -> ConsumerProperties Source #

Set the list of brokers to contact to connect to the Kafka cluster.

noAutoCommit :: ConsumerProperties Source #

Disable auto commit for the consumer.

noAutoOffsetStore :: ConsumerProperties Source #

Disable auto offset store for the consumer.

See enable.auto.offset.store for more information.

setCallback :: Callback -> ConsumerProperties Source #

Set the consumer callback.

For examples of use, see:

logLevel :: KafkaLogLevel -> ConsumerProperties Source #

Set the logging level. Usually is used with debugOptions to configure which logs are needed.

suppressDisconnectLogs :: ConsumerProperties Source #

Suppresses consumer log.connection.close.

It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.

extraProps :: Map Text Text -> ConsumerProperties Source #

Set any configuration options that are supported by librdkafka. The full list can be found here

extraProp :: Text -> Text -> ConsumerProperties Source #

Set any configuration option that is supported by librdkafka. The full list can be found here

debugOptions :: [KafkaDebug] -> ConsumerProperties Source #

Set debug features for the consumer. Usually is used with logLevel.

callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #

Set the callback poll mode. Default value is CallbackPollModeAsync.

runConsumer Source #

Arguments

:: ConsumerProperties 
-> Subscription 
-> (KafkaConsumer -> IO (Either KafkaError a))

A callback function to poll and handle messages

-> IO (Either KafkaError a) 

Deprecated: Use newConsumer/closeConsumer instead

Runs high-level kafka consumer. A callback provided is expected to call pollMessage when convenient.

newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #

Create a KafkaConsumer. This consumer must be correctly released using closeConsumer.

assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Assigns the consumer to consume from the given topics, partitions, and offsets.

assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #

Returns current consumer's assignment

subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #

Returns current consumer's subscription

pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Pauses specified partitions on the current consumer.

resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Resumes specified partitions on the current consumer.

committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve committed offsets for topics+partitions.

position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer. If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid is returned.

seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError) Source #

Seek a particular offset for each provided TopicPartition

pollMessage Source #

Arguments

:: MonadIO m 
=> KafkaConsumer 
-> Timeout

the timeout, in milliseconds

-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))

Left on error or timeout, right for success

Polls a single message

pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #

Polls the provided kafka consumer for events.

Events will cause application provided callbacks to be called.

The Timeout argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events.

This function is called on each pollMessage and, if runtime allows multi threading, it is called periodically in a separate thread to ensure the callbacks are handled ASAP.

There is no particular need to call this function manually unless some special cases in a single-threaded environment when polling for events on each pollMessage is not frequent enough.

pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))] Source #

Polls up to BatchSize messages. Unlike pollMessage this function does not return usual "timeout" errors. An empty batch is returned when there are no messages available.

This API is not available when CallbackPollMode is set to CallbackPollModeSync.

commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Commit message's offset on broker for the message's partition.

commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Stores offsets locally

storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Stores message's offset locally for the message's partition.

closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #

Closes the consumer.

See newConsumer

data RdKafkaRespErrT Source #

Constructors

RdKafkaRespErrBegin 
RdKafkaRespErrBadMsg 
RdKafkaRespErrBadCompression 
RdKafkaRespErrDestroy 
RdKafkaRespErrFail 
RdKafkaRespErrTransport 
RdKafkaRespErrCritSysResource 
RdKafkaRespErrResolve 
RdKafkaRespErrMsgTimedOut 
RdKafkaRespErrPartitionEof 
RdKafkaRespErrUnknownPartition 
RdKafkaRespErrFs 
RdKafkaRespErrUnknownTopic 
RdKafkaRespErrAllBrokersDown 
RdKafkaRespErrInvalidArg 
RdKafkaRespErrTimedOut 
RdKafkaRespErrQueueFull 
RdKafkaRespErrIsrInsuff 
RdKafkaRespErrNodeUpdate 
RdKafkaRespErrSsl 
RdKafkaRespErrWaitCoord 
RdKafkaRespErrUnknownGroup 
RdKafkaRespErrInProgress 
RdKafkaRespErrPrevInProgress 
RdKafkaRespErrExistingSubscription 
RdKafkaRespErrAssignPartitions 
RdKafkaRespErrRevokePartitions 
RdKafkaRespErrConflict 
RdKafkaRespErrState 
RdKafkaRespErrUnknownProtocol 
RdKafkaRespErrNotImplemented 
RdKafkaRespErrAuthentication 
RdKafkaRespErrNoOffset 
RdKafkaRespErrOutdated 
RdKafkaRespErrTimedOutQueue 
RdKafkaRespErrUnsupportedFeature 
RdKafkaRespErrWaitCache 
RdKafkaRespErrIntr 
RdKafkaRespErrKeySerialization 
RdKafkaRespErrValueSerialization 
RdKafkaRespErrKeyDeserialization 
RdKafkaRespErrValueDeserialization 
RdKafkaRespErrPartial 
RdKafkaRespErrReadOnly 
RdKafkaRespErrNoent 
RdKafkaRespErrUnderflow 
RdKafkaRespErrInvalidType 
RdKafkaRespErrRetry 
RdKafkaRespErrPurgeQueue 
RdKafkaRespErrPurgeInflight 
RdKafkaRespErrFatal 
RdKafkaRespErrInconsistent 
RdKafkaRespErrGaplessGuarantee 
RdKafkaRespErrMaxPollExceeded 
RdKafkaRespErrUnknownBroker 
RdKafkaRespErrNotConfigured 
RdKafkaRespErrFenced 
RdKafkaRespErrApplication 
RdKafkaRespErrEnd 
RdKafkaRespErrUnknown 
RdKafkaRespErrNoError 
RdKafkaRespErrOffsetOutOfRange 
RdKafkaRespErrInvalidMsg 
RdKafkaRespErrUnknownTopicOrPart 
RdKafkaRespErrInvalidMsgSize 
RdKafkaRespErrLeaderNotAvailable 
RdKafkaRespErrNotLeaderForPartition 
RdKafkaRespErrRequestTimedOut 
RdKafkaRespErrBrokerNotAvailable 
RdKafkaRespErrReplicaNotAvailable 
RdKafkaRespErrMsgSizeTooLarge 
RdKafkaRespErrStaleCtrlEpoch 
RdKafkaRespErrOffsetMetadataTooLarge 
RdKafkaRespErrNetworkException 
RdKafkaRespErrCoordinatorLoadInProgress 
RdKafkaRespErrCoordinatorNotAvailable 
RdKafkaRespErrNotCoordinator 
RdKafkaRespErrTopicException 
RdKafkaRespErrRecordListTooLarge 
RdKafkaRespErrNotEnoughReplicas 
RdKafkaRespErrNotEnoughReplicasAfterAppend 
RdKafkaRespErrInvalidRequiredAcks 
RdKafkaRespErrIllegalGeneration 
RdKafkaRespErrInconsistentGroupProtocol 
RdKafkaRespErrInvalidGroupId 
RdKafkaRespErrUnknownMemberId 
RdKafkaRespErrInvalidSessionTimeout 
RdKafkaRespErrRebalanceInProgress 
RdKafkaRespErrInvalidCommitOffsetSize 
RdKafkaRespErrTopicAuthorizationFailed 
RdKafkaRespErrGroupAuthorizationFailed 
RdKafkaRespErrClusterAuthorizationFailed 
RdKafkaRespErrInvalidTimestamp 
RdKafkaRespErrUnsupportedSaslMechanism 
RdKafkaRespErrIllegalSaslState 
RdKafkaRespErrUnsupportedVersion 
RdKafkaRespErrTopicAlreadyExists 
RdKafkaRespErrInvalidPartitions 
RdKafkaRespErrInvalidReplicationFactor 
RdKafkaRespErrInvalidReplicaAssignment 
RdKafkaRespErrInvalidConfig 
RdKafkaRespErrNotController 
RdKafkaRespErrInvalidRequest 
RdKafkaRespErrUnsupportedForMessageFormat 
RdKafkaRespErrPolicyViolation 
RdKafkaRespErrOutOfOrderSequenceNumber 
RdKafkaRespErrDuplicateSequenceNumber 
RdKafkaRespErrInvalidProducerEpoch 
RdKafkaRespErrInvalidTxnState 
RdKafkaRespErrInvalidProducerIdMapping 
RdKafkaRespErrInvalidTransactionTimeout 
RdKafkaRespErrConcurrentTransactions 
RdKafkaRespErrTransactionCoordinatorFenced 
RdKafkaRespErrTransactionalIdAuthorizationFailed 
RdKafkaRespErrSecurityDisabled 
RdKafkaRespErrOperationNotAttempted 
RdKafkaRespErrKafkaStorageError 
RdKafkaRespErrLogDirNotFound 
RdKafkaRespErrSaslAuthenticationFailed 
RdKafkaRespErrUnknownProducerId 
RdKafkaRespErrReassignmentInProgress 
RdKafkaRespErrDelegationTokenAuthDisabled 
RdKafkaRespErrDelegationTokenNotFound 
RdKafkaRespErrDelegationTokenOwnerMismatch 
RdKafkaRespErrDelegationTokenRequestNotAllowed 
RdKafkaRespErrDelegationTokenAuthorizationFailed 
RdKafkaRespErrDelegationTokenExpired 
RdKafkaRespErrInvalidPrincipalType 
RdKafkaRespErrNonEmptyGroup 
RdKafkaRespErrGroupIdNotFound 
RdKafkaRespErrFetchSessionIdNotFound 
RdKafkaRespErrInvalidFetchSessionEpoch 
RdKafkaRespErrListenerNotFound 
RdKafkaRespErrTopicDeletionDisabled 
RdKafkaRespErrFencedLeaderEpoch 
RdKafkaRespErrUnknownLeaderEpoch 
RdKafkaRespErrUnsupportedCompressionType 
RdKafkaRespErrStaleBrokerEpoch 
RdKafkaRespErrOffsetNotAvailable 
RdKafkaRespErrMemberIdRequired 
RdKafkaRespErrPreferredLeaderNotAvailable 
RdKafkaRespErrGroupMaxSizeReached 
RdKafkaRespErrFencedInstanceId 
RdKafkaRespErrEligibleLeadersNotAvailable 
RdKafkaRespErrElectionNotNeeded 
RdKafkaRespErrNoReassignmentInProgress 
RdKafkaRespErrGroupSubscribedToTopic 
RdKafkaRespErrInvalidRecord 
RdKafkaRespErrUnstableOffsetCommit 
RdKafkaRespErrEndAll