hw-kafka-client-2.6.0: Kafka bindings for Haskell

Safe HaskellNone
LanguageHaskell2010

Kafka.Consumer

Synopsis

Documentation

data KafkaCompressionCodec Source #

Constructors

NoCompression 
Gzip 
Snappy 
Lz4 
Instances
Eq KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Show KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Generic KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaCompressionCodec :: Type -> Type #

type Rep KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaCompressionCodec = D1 (MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" False) ((C1 (MetaCons "NoCompression" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Gzip" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Snappy" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Lz4" PrefixI False) (U1 :: Type -> Type)))

data KafkaDebug Source #

Instances
Eq KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Show KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Generic KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaDebug :: Type -> Type #

type Rep KafkaDebug Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaDebug = D1 (MetaData "KafkaDebug" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" False) (((C1 (MetaCons "DebugGeneric" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugBroker" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugTopic" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugMetadata" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugQueue" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugMsg" PrefixI False) (U1 :: Type -> Type)))) :+: ((C1 (MetaCons "DebugProtocol" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugCgrp" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugSecurity" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugFetch" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugFeature" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugAll" PrefixI False) (U1 :: Type -> Type)))))

data KafkaError Source #

Any Kafka errors

Instances
Eq KafkaError Source # 
Instance details

Defined in Kafka.Types

Show KafkaError Source # 
Instance details

Defined in Kafka.Types

Generic KafkaError Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaError :: Type -> Type #

Exception KafkaError Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaError Source # 
Instance details

Defined in Kafka.Types

newtype Timeout Source #

Timeout in milliseconds

Constructors

Timeout 

Fields

Instances
Eq Timeout Source # 
Instance details

Defined in Kafka.Types

Methods

(==) :: Timeout -> Timeout -> Bool #

(/=) :: Timeout -> Timeout -> Bool #

Read Timeout Source # 
Instance details

Defined in Kafka.Types

Show Timeout Source # 
Instance details

Defined in Kafka.Types

Generic Timeout Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Timeout :: Type -> Type #

Methods

from :: Timeout -> Rep Timeout x #

to :: Rep Timeout x -> Timeout #

type Rep Timeout Source # 
Instance details

Defined in Kafka.Types

type Rep Timeout = D1 (MetaData "Timeout" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "Timeout" PrefixI True) (S1 (MetaSel (Just "unTimeout") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int)))

newtype BrokerAddress Source #

Kafka broker address string (e.g. broker1:9092)

Constructors

BrokerAddress 
Instances
Eq BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Show BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Generic BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerAddress :: Type -> Type #

type Rep BrokerAddress Source # 
Instance details

Defined in Kafka.Types

type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

newtype TopicName Source #

Topic name to be consumed

Wildcard (regex) topics are supported by the librdkafka assignor: any topic name in the topics list that is prefixed with ^ will be regex-matched to the full list of topics in the cluster and matching topics will be added to the subscription list.

Constructors

TopicName

a simple topic name or a regex if started with ^

Fields

Instances
Eq TopicName Source # 
Instance details

Defined in Kafka.Types

Ord TopicName Source # 
Instance details

Defined in Kafka.Types

Read TopicName Source # 
Instance details

Defined in Kafka.Types

Show TopicName Source # 
Instance details

Defined in Kafka.Types

Generic TopicName Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicName :: Type -> Type #

type Rep TopicName Source # 
Instance details

Defined in Kafka.Types

type Rep TopicName = D1 (MetaData "TopicName" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "TopicName" PrefixI True) (S1 (MetaSel (Just "unTopicName") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

data TopicType Source #

Constructors

User

Normal topics that are created by user.

System

Topics starting with "" (consumer_offsets, __confluent.support.metrics) are considered "system" topics

Instances
Eq TopicType Source # 
Instance details

Defined in Kafka.Types

Ord TopicType Source # 
Instance details

Defined in Kafka.Types

Read TopicType Source # 
Instance details

Defined in Kafka.Types

Show TopicType Source # 
Instance details

Defined in Kafka.Types

Generic TopicType Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicType :: Type -> Type #

type Rep TopicType Source # 
Instance details

Defined in Kafka.Types

type Rep TopicType = D1 (MetaData "TopicType" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" False) (C1 (MetaCons "User" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "System" PrefixI False) (U1 :: Type -> Type))

newtype BatchSize Source #

Constructors

BatchSize 

Fields

Instances
Eq BatchSize Source # 
Instance details

Defined in Kafka.Types

Num BatchSize Source # 
Instance details

Defined in Kafka.Types

Ord BatchSize Source # 
Instance details

Defined in Kafka.Types

Read BatchSize Source # 
Instance details

Defined in Kafka.Types

Show BatchSize Source # 
Instance details

Defined in Kafka.Types

Generic BatchSize Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BatchSize :: Type -> Type #

type Rep BatchSize Source # 
Instance details

Defined in Kafka.Types

type Rep BatchSize = D1 (MetaData "BatchSize" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "BatchSize" PrefixI True) (S1 (MetaSel (Just "unBatchSize") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int)))

newtype ClientId Source #

Constructors

ClientId 

Fields

Instances
Eq ClientId Source # 
Instance details

Defined in Kafka.Types

Ord ClientId Source # 
Instance details

Defined in Kafka.Types

Show ClientId Source # 
Instance details

Defined in Kafka.Types

Generic ClientId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep ClientId :: Type -> Type #

Methods

from :: ClientId -> Rep ClientId x #

to :: Rep ClientId x -> ClientId #

type Rep ClientId Source # 
Instance details

Defined in Kafka.Types

type Rep ClientId = D1 (MetaData "ClientId" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "ClientId" PrefixI True) (S1 (MetaSel (Just "unClientId") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

newtype Millis Source #

Constructors

Millis 

Fields

Instances
Eq Millis Source # 
Instance details

Defined in Kafka.Types

Methods

(==) :: Millis -> Millis -> Bool #

(/=) :: Millis -> Millis -> Bool #

Num Millis Source # 
Instance details

Defined in Kafka.Types

Ord Millis Source # 
Instance details

Defined in Kafka.Types

Read Millis Source # 
Instance details

Defined in Kafka.Types

Show Millis Source # 
Instance details

Defined in Kafka.Types

Generic Millis Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Millis :: Type -> Type #

Methods

from :: Millis -> Rep Millis x #

to :: Rep Millis x -> Millis #

type Rep Millis Source # 
Instance details

Defined in Kafka.Types

type Rep Millis = D1 (MetaData "Millis" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "Millis" PrefixI True) (S1 (MetaSel (Just "unMillis") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int64)))

newtype PartitionId Source #

Constructors

PartitionId 

Fields

Instances
Enum PartitionId Source # 
Instance details

Defined in Kafka.Types

Eq PartitionId Source # 
Instance details

Defined in Kafka.Types

Ord PartitionId Source # 
Instance details

Defined in Kafka.Types

Read PartitionId Source # 
Instance details

Defined in Kafka.Types

Show PartitionId Source # 
Instance details

Defined in Kafka.Types

Generic PartitionId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep PartitionId :: Type -> Type #

type Rep PartitionId Source # 
Instance details

Defined in Kafka.Types

type Rep PartitionId = D1 (MetaData "PartitionId" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "PartitionId" PrefixI True) (S1 (MetaSel (Just "unPartitionId") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int)))

newtype BrokerId Source #

Constructors

BrokerId 

Fields

Instances
Eq BrokerId Source # 
Instance details

Defined in Kafka.Types

Ord BrokerId Source # 
Instance details

Defined in Kafka.Types

Read BrokerId Source # 
Instance details

Defined in Kafka.Types

Show BrokerId Source # 
Instance details

Defined in Kafka.Types

Generic BrokerId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerId :: Type -> Type #

Methods

from :: BrokerId -> Rep BrokerId x #

to :: Rep BrokerId x -> BrokerId #

type Rep BrokerId Source # 
Instance details

Defined in Kafka.Types

type Rep BrokerId = D1 (MetaData "BrokerId" "Kafka.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "BrokerId" PrefixI True) (S1 (MetaSel (Just "unBrokerId") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int)))

data ConsumerRecord k v Source #

Represents a received message from Kafka (i.e. used in a consumer)

Constructors

ConsumerRecord 

Fields

Instances
Bitraversable ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bitraverse :: Applicative f => (a -> f c) -> (b -> f d) -> ConsumerRecord a b -> f (ConsumerRecord c d) #

Bifoldable ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bifold :: Monoid m => ConsumerRecord m m -> m #

bifoldMap :: Monoid m => (a -> m) -> (b -> m) -> ConsumerRecord a b -> m #

bifoldr :: (a -> c -> c) -> (b -> c -> c) -> c -> ConsumerRecord a b -> c #

bifoldl :: (c -> a -> c) -> (c -> b -> c) -> c -> ConsumerRecord a b -> c #

Bifunctor ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bimap :: (a -> b) -> (c -> d) -> ConsumerRecord a c -> ConsumerRecord b d #

first :: (a -> b) -> ConsumerRecord a c -> ConsumerRecord b c #

second :: (b -> c) -> ConsumerRecord a b -> ConsumerRecord a c #

Functor (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

fmap :: (a -> b) -> ConsumerRecord k a -> ConsumerRecord k b #

(<$) :: a -> ConsumerRecord k b -> ConsumerRecord k a #

Foldable (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

fold :: Monoid m => ConsumerRecord k m -> m #

foldMap :: Monoid m => (a -> m) -> ConsumerRecord k a -> m #

foldr :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldr' :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldl :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldl' :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldr1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

foldl1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

toList :: ConsumerRecord k a -> [a] #

null :: ConsumerRecord k a -> Bool #

length :: ConsumerRecord k a -> Int #

elem :: Eq a => a -> ConsumerRecord k a -> Bool #

maximum :: Ord a => ConsumerRecord k a -> a #

minimum :: Ord a => ConsumerRecord k a -> a #

sum :: Num a => ConsumerRecord k a -> a #

product :: Num a => ConsumerRecord k a -> a #

Traversable (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

traverse :: Applicative f => (a -> f b) -> ConsumerRecord k a -> f (ConsumerRecord k b) #

sequenceA :: Applicative f => ConsumerRecord k (f a) -> f (ConsumerRecord k a) #

mapM :: Monad m => (a -> m b) -> ConsumerRecord k a -> m (ConsumerRecord k b) #

sequence :: Monad m => ConsumerRecord k (m a) -> m (ConsumerRecord k a) #

(Eq k, Eq v) => Eq (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

(Read k, Read v) => Read (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

(Show k, Show v) => Show (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep (ConsumerRecord k v) :: Type -> Type #

Methods

from :: ConsumerRecord k v -> Rep (ConsumerRecord k v) x #

to :: Rep (ConsumerRecord k v) x -> ConsumerRecord k v #

type Rep (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

data TopicPartition Source #

Kafka topic partition structure

data OffsetStoreMethod Source #

Indicates the method of storing the offsets

Constructors

OffsetStoreBroker

Offsets are stored in Kafka broker (preferred)

OffsetStoreFile FilePath OffsetStoreSync

Offsets are stored in a file (and synced to disk according to the sync policy)

data OffsetStoreSync Source #

Indicates how offsets are to be synced to disk

Constructors

OffsetSyncDisable

Do not sync offsets (in Kafka: -1)

OffsetSyncImmediate

Sync immediately after each offset commit (in Kafka: 0)

OffsetSyncInterval Int

Sync after specified interval in millis

Instances
Eq OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Show OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetStoreSync :: Type -> Type #

type Rep OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreSync = D1 (MetaData "OffsetStoreSync" "Kafka.Consumer.Types" "hw-kafka-client-2.6.0-inplace" False) (C1 (MetaCons "OffsetSyncDisable" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "OffsetSyncImmediate" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "OffsetSyncInterval" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int))))

data OffsetCommit Source #

Offsets commit mode

Constructors

OffsetCommit

Forces consumer to block until the broker offsets commit is done

OffsetCommitAsync

Offsets will be committed in a non-blocking way

Instances
Eq OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Show OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetCommit :: Type -> Type #

type Rep OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetCommit = D1 (MetaData "OffsetCommit" "Kafka.Consumer.Types" "hw-kafka-client-2.6.0-inplace" False) (C1 (MetaCons "OffsetCommit" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "OffsetCommitAsync" PrefixI False) (U1 :: Type -> Type))

data Timestamp Source #

Instances
Eq Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Read Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Show Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep Timestamp :: Type -> Type #

type Rep Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Timestamp = D1 (MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-2.6.0-inplace" False) (C1 (MetaCons "CreateTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: (C1 (MetaCons "LogAppendTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: C1 (MetaCons "NoTimestamp" PrefixI False) (U1 :: Type -> Type)))

data PartitionOffset Source #

Instances
Eq PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Show PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep PartitionOffset :: Type -> Type #

type Rep PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep PartitionOffset = D1 (MetaData "PartitionOffset" "Kafka.Consumer.Types" "hw-kafka-client-2.6.0-inplace" False) ((C1 (MetaCons "PartitionOffsetBeginning" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "PartitionOffsetEnd" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "PartitionOffset" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int64)) :+: (C1 (MetaCons "PartitionOffsetStored" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "PartitionOffsetInvalid" PrefixI False) (U1 :: Type -> Type))))

data RebalanceEvent Source #

A set of events which happen during the rebalancing process

Constructors

RebalanceBeforeAssign [(TopicName, PartitionId)]

Happens before Kafka Client confirms new assignment

RebalanceAssign [(TopicName, PartitionId)]

Happens after the new assignment is confirmed

RebalanceBeforeRevoke [(TopicName, PartitionId)]

Happens before Kafka Client confirms partitions rejection

RebalanceRevoke [(TopicName, PartitionId)]

Happens after the rejection is confirmed

Instances
Eq RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Show RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep RebalanceEvent :: Type -> Type #

type Rep RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

data OffsetReset Source #

Constructors

Earliest 
Latest 
Instances
Eq OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Show OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetReset :: Type -> Type #

type Rep OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetReset = D1 (MetaData "OffsetReset" "Kafka.Consumer.Types" "hw-kafka-client-2.6.0-inplace" False) (C1 (MetaCons "Earliest" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Latest" PrefixI False) (U1 :: Type -> Type))

newtype Offset Source #

Constructors

Offset 

Fields

Instances
Eq Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

(==) :: Offset -> Offset -> Bool #

(/=) :: Offset -> Offset -> Bool #

Ord Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Read Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Show Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep Offset :: Type -> Type #

Methods

from :: Offset -> Rep Offset x #

to :: Rep Offset x -> Offset #

type Rep Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Offset = D1 (MetaData "Offset" "Kafka.Consumer.Types" "hw-kafka-client-2.6.0-inplace" True) (C1 (MetaCons "Offset" PrefixI True) (S1 (MetaSel (Just "unOffset") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int64)))

newtype ConsumerGroupId Source #

Constructors

ConsumerGroupId 

crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v Source #

crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' Source #

crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' Source #

sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) Source #

Deprecated: Isn't concern of this library. Use 'bitraverse id pure'

traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) Source #

Deprecated: Isn't concern of this library. Use 'bitraverse f pure'

traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) Source #

Deprecated: Isn't concern of this library. Use 'bitraverse id pure $ bitraverse f pure r'

traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) Source #

Deprecated: Isn't concern of this library. Use 'sequenceA $ traverse f r'

bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) Source #

Deprecated: Isn't concern of this library. Use 'bisequenceA $ bimapM f g r'

errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #

logCallback :: HasKafkaConf k => (Int -> String -> String -> IO ()) -> k -> IO () Source #

statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO () Source #

rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #

Sets a callback that is called when rebalance is needed.

Callback implementations suppose to watch for KafkaResponseError RdKafkaRespErrAssignPartitions and for KafkaResponseError RdKafkaRespErrRevokePartitions. Other error codes are not expected and would indicate something really bad happening in a system, or bugs in librdkafka itself.

A callback is expected to call assign according to the error code it receives.

  • When RdKafkaRespErrAssignPartitions happens assign should be called with all the partitions it was called with. It is OK to alter partitions offsets before calling assign.
  • When RdKafkaRespErrRevokePartitions happens assign should be called with an empty list of partitions. rebalanceCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()

offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #

Sets a callback that is called when rebalance is needed.

The results of automatic or manual offset commits will be scheduled for this callback and is served by pollMessage.

A callback is expected to call assign according to the error code it receives.

If no partitions had valid offsets to commit this callback will be called with KafkaError == KafkaResponseError RdKafkaRespErrNoOffset which is not to be considered an error.

noAutoCommit :: ConsumerProperties Source #

Disables auto commit for the consumer

noAutoOffsetStore :: ConsumerProperties Source #

Disables auto offset store for the consumer

setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #

logLevel :: KafkaLogLevel -> ConsumerProperties Source #

Sets the logging level. Usually is used with debugOptions to configure which logs are needed.

compression :: KafkaCompressionCodec -> ConsumerProperties Source #

Sets the compression codec for the consumer.

suppressDisconnectLogs :: ConsumerProperties Source #

Suppresses consumer disconnects logs.

It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.

extraProps :: Map Text Text -> ConsumerProperties Source #

Any configuration options that are supported by librdkafka. The full list can be found here

extraProp :: Text -> Text -> ConsumerProperties Source #

Any configuration options that are supported by librdkafka. The full list can be found here

debugOptions :: [KafkaDebug] -> ConsumerProperties Source #

Sets debug features for the consumer. Usually is used with consumerLogLevel.

runConsumer Source #

Arguments

:: ConsumerProperties 
-> Subscription 
-> (KafkaConsumer -> IO (Either KafkaError a))

A callback function to poll and handle messages

-> IO (Either KafkaError a) 

Deprecated: Use newConsumer/closeConsumer instead

Runs high-level kafka consumer. A callback provided is expected to call pollMessage when convenient.

assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #

Returns current consumer's assignment

subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #

Returns current consumer's subscription

pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Pauses specified partitions on the current consumer.

resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Resumes specified partitions on the current consumer.

committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve committed offsets for topics+partitions.

position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer. If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid is returned.

pollMessage Source #

Arguments

:: MonadIO m 
=> KafkaConsumer 
-> Timeout

the timeout, in milliseconds

-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))

Left on error or timeout, right for success

pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #

Polls the provided kafka consumer for events.

Events will cause application provided callbacks to be called.

The p timeout_ms argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events.

This function is called on each pollMessage and, if runtime allows multi threading, it is called periodically in a separate thread to ensure the callbacks are handled ASAP.

There is no particular need to call this function manually unless some special cases in a single-threaded environment when polling for events on each pollMessage is not frequent enough.

pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))] Source #

Polls up to BatchSize messages. Unlike pollMessage this function does not return usual "timeout" errors. An empty batch is returned when there are no messages available.

commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Commit message's offset on broker for the message's partition.

commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Stores offsets locally

storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Stores message's offset locally for the message's partition.

closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #

Closes the consumer.

data RdKafkaRespErrT Source #

Constructors

RdKafkaRespErrBegin 
RdKafkaRespErrBadMsg 
RdKafkaRespErrBadCompression 
RdKafkaRespErrDestroy 
RdKafkaRespErrFail 
RdKafkaRespErrTransport 
RdKafkaRespErrCritSysResource 
RdKafkaRespErrResolve 
RdKafkaRespErrMsgTimedOut 
RdKafkaRespErrPartitionEof 
RdKafkaRespErrUnknownPartition 
RdKafkaRespErrFs 
RdKafkaRespErrUnknownTopic 
RdKafkaRespErrAllBrokersDown 
RdKafkaRespErrInvalidArg 
RdKafkaRespErrTimedOut 
RdKafkaRespErrQueueFull 
RdKafkaRespErrIsrInsuff 
RdKafkaRespErrNodeUpdate 
RdKafkaRespErrSsl 
RdKafkaRespErrWaitCoord 
RdKafkaRespErrUnknownGroup 
RdKafkaRespErrInProgress 
RdKafkaRespErrPrevInProgress 
RdKafkaRespErrExistingSubscription 
RdKafkaRespErrAssignPartitions 
RdKafkaRespErrRevokePartitions 
RdKafkaRespErrConflict 
RdKafkaRespErrState 
RdKafkaRespErrUnknownProtocol 
RdKafkaRespErrNotImplemented 
RdKafkaRespErrAuthentication 
RdKafkaRespErrNoOffset 
RdKafkaRespErrOutdated 
RdKafkaRespErrTimedOutQueue 
RdKafkaRespErrUnsupportedFeature 
RdKafkaRespErrWaitCache 
RdKafkaRespErrIntr 
RdKafkaRespErrKeySerialization 
RdKafkaRespErrValueSerialization 
RdKafkaRespErrKeyDeserialization 
RdKafkaRespErrValueDeserialization 
RdKafkaRespErrPartial 
RdKafkaRespErrReadOnly 
RdKafkaRespErrNoent 
RdKafkaRespErrUnderflow 
RdKafkaRespErrInvalidType 
RdKafkaRespErrRetry 
RdKafkaRespErrPurgeQueue 
RdKafkaRespErrPurgeInflight 
RdKafkaRespErrFatal 
RdKafkaRespErrInconsistent 
RdKafkaRespErrGaplessGuarantee 
RdKafkaRespErrMaxPollExceeded 
RdKafkaRespErrEnd 
RdKafkaRespErrUnknown 
RdKafkaRespErrNoError 
RdKafkaRespErrOffsetOutOfRange 
RdKafkaRespErrInvalidMsg 
RdKafkaRespErrUnknownTopicOrPart 
RdKafkaRespErrInvalidMsgSize 
RdKafkaRespErrLeaderNotAvailable 
RdKafkaRespErrNotLeaderForPartition 
RdKafkaRespErrRequestTimedOut 
RdKafkaRespErrBrokerNotAvailable 
RdKafkaRespErrReplicaNotAvailable 
RdKafkaRespErrMsgSizeTooLarge 
RdKafkaRespErrStaleCtrlEpoch 
RdKafkaRespErrOffsetMetadataTooLarge 
RdKafkaRespErrNetworkException 
RdKafkaRespErrGroupLoadInProgress 
RdKafkaRespErrGroupCoordinatorNotAvailable 
RdKafkaRespErrNotCoordinatorForGroup 
RdKafkaRespErrTopicException 
RdKafkaRespErrRecordListTooLarge 
RdKafkaRespErrNotEnoughReplicas 
RdKafkaRespErrNotEnoughReplicasAfterAppend 
RdKafkaRespErrInvalidRequiredAcks 
RdKafkaRespErrIllegalGeneration 
RdKafkaRespErrInconsistentGroupProtocol 
RdKafkaRespErrInvalidGroupId 
RdKafkaRespErrUnknownMemberId 
RdKafkaRespErrInvalidSessionTimeout 
RdKafkaRespErrRebalanceInProgress 
RdKafkaRespErrInvalidCommitOffsetSize 
RdKafkaRespErrTopicAuthorizationFailed 
RdKafkaRespErrGroupAuthorizationFailed 
RdKafkaRespErrClusterAuthorizationFailed 
RdKafkaRespErrInvalidTimestamp 
RdKafkaRespErrUnsupportedSaslMechanism 
RdKafkaRespErrIllegalSaslState 
RdKafkaRespErrUnsupportedVersion 
RdKafkaRespErrTopicAlreadyExists 
RdKafkaRespErrInvalidPartitions 
RdKafkaRespErrInvalidReplicationFactor 
RdKafkaRespErrInvalidReplicaAssignment 
RdKafkaRespErrInvalidConfig 
RdKafkaRespErrNotController 
RdKafkaRespErrInvalidRequest 
RdKafkaRespErrUnsupportedForMessageFormat 
RdKafkaRespErrPolicyViolation 
RdKafkaRespErrOutOfOrderSequenceNumber 
RdKafkaRespErrDuplicateSequenceNumber 
RdKafkaRespErrInvalidProducerEpoch 
RdKafkaRespErrInvalidTxnState 
RdKafkaRespErrInvalidProducerIdMapping 
RdKafkaRespErrInvalidTransactionTimeout 
RdKafkaRespErrConcurrentTransactions 
RdKafkaRespErrTransactionCoordinatorFenced 
RdKafkaRespErrTransactionalIdAuthorizationFailed 
RdKafkaRespErrSecurityDisabled 
RdKafkaRespErrOperationNotAttempted 
RdKafkaRespErrKafkaStorageError 
RdKafkaRespErrLogDirNotFound 
RdKafkaRespErrSaslAuthenticationFailed 
RdKafkaRespErrUnknownProducerId 
RdKafkaRespErrReassignmentInProgress 
RdKafkaRespErrDelegationTokenAuthDisabled 
RdKafkaRespErrDelegationTokenNotFound 
RdKafkaRespErrDelegationTokenOwnerMismatch 
RdKafkaRespErrDelegationTokenRequestNotAllowed 
RdKafkaRespErrDelegationTokenAuthorizationFailed 
RdKafkaRespErrDelegationTokenExpired 
RdKafkaRespErrInvalidPrincipalType 
RdKafkaRespErrNonEmptyGroup 
RdKafkaRespErrGroupIdNotFound 
RdKafkaRespErrFetchSessionIdNotFound 
RdKafkaRespErrInvalidFetchSessionEpoch 
RdKafkaRespErrListenerNotFound 
RdKafkaRespErrTopicDeletionDisabled 
RdKafkaRespErrUnsupportedCompressionType 
RdKafkaRespErrEndAll