Safe Haskell | None |
---|---|
Language | Haskell2010 |
Module to consume messages from Kafka topics.
Here's an example of code to consume messages from a topic:
import Control.Exception (bracket) import Data.Monoid ((<>)) import Kafka import Kafka.Consumer -- Global consumer properties consumerProps ::ConsumerProperties
consumerProps =brokersList
[BrokerAddress
"localhost:9092"] <>groupId
(ConsumerGroupId
"consumer_example_group") <>noAutoCommit
<>logLevel
KafkaLogInfo
-- Subscription to topics consumerSub ::Subscription
consumerSub =topics
[TopicName
"kafka-client-example-topic"] <>offsetReset
Earliest
-- Running an example runConsumerExample :: IO () runConsumerExample = do res <- bracket mkConsumer clConsumer runHandler print res where mkConsumer =newConsumer
consumerProps consumerSub clConsumer (Left err) = return (Left err) clConsumer (Right kc) = (maybe (Right ()) Left) <$>closeConsumer
kc runHandler (Left err) = return (Left err) runHandler (Right kc) = processMessages kc -- Example polling 10 times before stopping processMessages ::KafkaConsumer
-> IO (EitherKafkaError
()) processMessages kafka = do mapM_ (_ -> do msg1 <-pollMessage
kafka (Timeout
1000) putStrLn $ "Message: " <> show msg1 err <-commitAllOffsets
OffsetCommit
kafka putStrLn $ "Offsets: " <> maybe "Committed." show err ) [0 .. 10] return $ Right ()
Synopsis
- data KafkaConsumer
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaError
- data KafkaLogLevel
- newtype Timeout = Timeout {}
- newtype BrokerAddress = BrokerAddress {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data TopicType
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- newtype ClientId = ClientId {
- unClientId :: Text
- newtype Millis = Millis {}
- newtype PartitionId = PartitionId {
- unPartitionId :: Int
- newtype BrokerId = BrokerId {
- unBrokerId :: Int
- topicType :: TopicName -> TopicType
- kafkaDebugToText :: KafkaDebug -> Text
- kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text
- data ConsumerRecord k v = ConsumerRecord {
- crTopic :: !TopicName
- crPartition :: !PartitionId
- crOffset :: !Offset
- crTimestamp :: !Timestamp
- crKey :: !k
- crValue :: !v
- data TopicPartition = TopicPartition {}
- data OffsetStoreMethod
- data OffsetStoreSync
- data OffsetCommit
- data Timestamp
- data SubscribedPartitions
- data PartitionOffset
- data RebalanceEvent
- data OffsetReset
- newtype Offset = Offset {}
- newtype ConsumerGroupId = ConsumerGroupId {}
- kcKafkaPtr :: KafkaConsumer -> Kafka
- kcKafkaConf :: KafkaConsumer -> KafkaConf
- crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
- crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
- crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
- sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
- traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v)
- traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v))
- traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v'))
- bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v'))
- data Subscription = Subscription (Set TopicName) (Map Text Text)
- topics :: [TopicName] -> Subscription
- offsetReset :: OffsetReset -> Subscription
- extraSubscriptionProps :: Map Text Text -> Subscription
- errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
- logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO ()
- statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO ()
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
- data ConsumerProperties = ConsumerProperties {
- cpProps :: Map Text Text
- cpLogLevel :: Maybe KafkaLogLevel
- cpCallbacks :: [KafkaConf -> IO ()]
- cpCallbackPollMode :: CallbackPollMode
- data CallbackPollMode
- brokersList :: [BrokerAddress] -> ConsumerProperties
- autoCommit :: Millis -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- callbackPollMode :: CallbackPollMode -> ConsumerProperties
- runConsumer :: ConsumerProperties -> Subscription -> (KafkaConsumer -> IO (Either KafkaError a)) -> IO (Either KafkaError a)
- newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer)
- assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId]))
- subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
- pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
- pollMessage :: MonadIO m => KafkaConsumer -> Timeout -> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
- pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
- commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
- commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
- data RdKafkaRespErrT
- = RdKafkaRespErrBegin
- | RdKafkaRespErrBadMsg
- | RdKafkaRespErrBadCompression
- | RdKafkaRespErrDestroy
- | RdKafkaRespErrFail
- | RdKafkaRespErrTransport
- | RdKafkaRespErrCritSysResource
- | RdKafkaRespErrResolve
- | RdKafkaRespErrMsgTimedOut
- | RdKafkaRespErrPartitionEof
- | RdKafkaRespErrUnknownPartition
- | RdKafkaRespErrFs
- | RdKafkaRespErrUnknownTopic
- | RdKafkaRespErrAllBrokersDown
- | RdKafkaRespErrInvalidArg
- | RdKafkaRespErrTimedOut
- | RdKafkaRespErrQueueFull
- | RdKafkaRespErrIsrInsuff
- | RdKafkaRespErrNodeUpdate
- | RdKafkaRespErrSsl
- | RdKafkaRespErrWaitCoord
- | RdKafkaRespErrUnknownGroup
- | RdKafkaRespErrInProgress
- | RdKafkaRespErrPrevInProgress
- | RdKafkaRespErrExistingSubscription
- | RdKafkaRespErrAssignPartitions
- | RdKafkaRespErrRevokePartitions
- | RdKafkaRespErrConflict
- | RdKafkaRespErrState
- | RdKafkaRespErrUnknownProtocol
- | RdKafkaRespErrNotImplemented
- | RdKafkaRespErrAuthentication
- | RdKafkaRespErrNoOffset
- | RdKafkaRespErrOutdated
- | RdKafkaRespErrTimedOutQueue
- | RdKafkaRespErrUnsupportedFeature
- | RdKafkaRespErrWaitCache
- | RdKafkaRespErrIntr
- | RdKafkaRespErrKeySerialization
- | RdKafkaRespErrValueSerialization
- | RdKafkaRespErrKeyDeserialization
- | RdKafkaRespErrValueDeserialization
- | RdKafkaRespErrPartial
- | RdKafkaRespErrReadOnly
- | RdKafkaRespErrNoent
- | RdKafkaRespErrUnderflow
- | RdKafkaRespErrInvalidType
- | RdKafkaRespErrRetry
- | RdKafkaRespErrPurgeQueue
- | RdKafkaRespErrPurgeInflight
- | RdKafkaRespErrFatal
- | RdKafkaRespErrInconsistent
- | RdKafkaRespErrGaplessGuarantee
- | RdKafkaRespErrMaxPollExceeded
- | RdKafkaRespErrUnknownBroker
- | RdKafkaRespErrEnd
- | RdKafkaRespErrUnknown
- | RdKafkaRespErrNoError
- | RdKafkaRespErrOffsetOutOfRange
- | RdKafkaRespErrInvalidMsg
- | RdKafkaRespErrUnknownTopicOrPart
- | RdKafkaRespErrInvalidMsgSize
- | RdKafkaRespErrLeaderNotAvailable
- | RdKafkaRespErrNotLeaderForPartition
- | RdKafkaRespErrRequestTimedOut
- | RdKafkaRespErrBrokerNotAvailable
- | RdKafkaRespErrReplicaNotAvailable
- | RdKafkaRespErrMsgSizeTooLarge
- | RdKafkaRespErrStaleCtrlEpoch
- | RdKafkaRespErrOffsetMetadataTooLarge
- | RdKafkaRespErrNetworkException
- | RdKafkaRespErrCoordinatorLoadInProgress
- | RdKafkaRespErrCoordinatorNotAvailable
- | RdKafkaRespErrNotCoordinator
- | RdKafkaRespErrTopicException
- | RdKafkaRespErrRecordListTooLarge
- | RdKafkaRespErrNotEnoughReplicas
- | RdKafkaRespErrNotEnoughReplicasAfterAppend
- | RdKafkaRespErrInvalidRequiredAcks
- | RdKafkaRespErrIllegalGeneration
- | RdKafkaRespErrInconsistentGroupProtocol
- | RdKafkaRespErrInvalidGroupId
- | RdKafkaRespErrUnknownMemberId
- | RdKafkaRespErrInvalidSessionTimeout
- | RdKafkaRespErrRebalanceInProgress
- | RdKafkaRespErrInvalidCommitOffsetSize
- | RdKafkaRespErrTopicAuthorizationFailed
- | RdKafkaRespErrGroupAuthorizationFailed
- | RdKafkaRespErrClusterAuthorizationFailed
- | RdKafkaRespErrInvalidTimestamp
- | RdKafkaRespErrUnsupportedSaslMechanism
- | RdKafkaRespErrIllegalSaslState
- | RdKafkaRespErrUnsupportedVersion
- | RdKafkaRespErrTopicAlreadyExists
- | RdKafkaRespErrInvalidPartitions
- | RdKafkaRespErrInvalidReplicationFactor
- | RdKafkaRespErrInvalidReplicaAssignment
- | RdKafkaRespErrInvalidConfig
- | RdKafkaRespErrNotController
- | RdKafkaRespErrInvalidRequest
- | RdKafkaRespErrUnsupportedForMessageFormat
- | RdKafkaRespErrPolicyViolation
- | RdKafkaRespErrOutOfOrderSequenceNumber
- | RdKafkaRespErrDuplicateSequenceNumber
- | RdKafkaRespErrInvalidProducerEpoch
- | RdKafkaRespErrInvalidTxnState
- | RdKafkaRespErrInvalidProducerIdMapping
- | RdKafkaRespErrInvalidTransactionTimeout
- | RdKafkaRespErrConcurrentTransactions
- | RdKafkaRespErrTransactionCoordinatorFenced
- | RdKafkaRespErrTransactionalIdAuthorizationFailed
- | RdKafkaRespErrSecurityDisabled
- | RdKafkaRespErrOperationNotAttempted
- | RdKafkaRespErrKafkaStorageError
- | RdKafkaRespErrLogDirNotFound
- | RdKafkaRespErrSaslAuthenticationFailed
- | RdKafkaRespErrUnknownProducerId
- | RdKafkaRespErrReassignmentInProgress
- | RdKafkaRespErrDelegationTokenAuthDisabled
- | RdKafkaRespErrDelegationTokenNotFound
- | RdKafkaRespErrDelegationTokenOwnerMismatch
- | RdKafkaRespErrDelegationTokenRequestNotAllowed
- | RdKafkaRespErrDelegationTokenAuthorizationFailed
- | RdKafkaRespErrDelegationTokenExpired
- | RdKafkaRespErrInvalidPrincipalType
- | RdKafkaRespErrNonEmptyGroup
- | RdKafkaRespErrGroupIdNotFound
- | RdKafkaRespErrFetchSessionIdNotFound
- | RdKafkaRespErrInvalidFetchSessionEpoch
- | RdKafkaRespErrListenerNotFound
- | RdKafkaRespErrTopicDeletionDisabled
- | RdKafkaRespErrFencedLeaderEpoch
- | RdKafkaRespErrUnknownLeaderEpoch
- | RdKafkaRespErrUnsupportedCompressionType
- | RdKafkaRespErrStaleBrokerEpoch
- | RdKafkaRespErrOffsetNotAvailable
- | RdKafkaRespErrMemberIdRequired
- | RdKafkaRespErrPreferredLeaderNotAvailable
- | RdKafkaRespErrGroupMaxSizeReached
- | RdKafkaRespErrEndAll
Documentation
data KafkaConsumer Source #
The main type for Kafka consumption, used e.g. to poll and commit messages.
Its constructor is intentionally not exposed, instead, one should used newConsumer
to acquire such a value.
data KafkaCompressionCodec Source #
Compression codec used by a topic
Instances
Eq KafkaCompressionCodec Source # | |
Defined in Kafka.Types (==) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # (/=) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # | |
Show KafkaCompressionCodec Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaCompressionCodec -> ShowS # show :: KafkaCompressionCodec -> String # showList :: [KafkaCompressionCodec] -> ShowS # | |
Generic KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec :: Type -> Type # | |
type Rep KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec = D1 (MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-3.1.1-inplace" False) ((C1 (MetaCons "NoCompression" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Gzip" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Snappy" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Lz4" PrefixI False) (U1 :: Type -> Type))) |
data KafkaDebug Source #
Available librdkafka debug contexts
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
Instances
data KafkaError Source #
All possible Kafka errors
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
Instances
data KafkaLogLevel Source #
Log levels for librdkafka.
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Instances
Enum KafkaLogLevel Source # | |
Defined in Kafka.Types succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel Source # | |
Defined in Kafka.Types (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |
Timeout in milliseconds
newtype BrokerAddress Source #
Kafka broker address string (e.g. broker1:9092
)
Instances
Eq BrokerAddress Source # | |
Defined in Kafka.Types (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress Source # | |
Defined in Kafka.Types showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
Generic BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress :: Type -> Type # from :: BrokerAddress -> Rep BrokerAddress x # to :: Rep BrokerAddress x -> BrokerAddress # | |
type Rep BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-3.1.1-inplace" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) |
Topic name to be consumed
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^
will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
TopicName | a simple topic name or a regex if started with |
|
Whether the topic is created by a user or by the system
User | Normal topics that are created by user. |
System | Topics starting with a double underscore "__" ( |
Batch size used for polling
Client ID used by Kafka to better track requests
A number of milliseconds, used to represent durations and timestamps
newtype PartitionId Source #
Topic partition ID
Instances
Kafka broker ID
topicType :: TopicName -> TopicType Source #
Deduce the type of a topic from its name, by checking if it starts with a double underscore "__"
kafkaDebugToText :: KafkaDebug -> Text Source #
Convert a KafkaDebug
into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text Source #
Convert a KafkaCompressionCodec
into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
data ConsumerRecord k v Source #
Represents a received message from Kafka (i.e. used in a consumer)
ConsumerRecord | |
|
Instances
data TopicPartition Source #
Kafka topic partition structure
Instances
data OffsetStoreMethod Source #
Indicates the method of storing the offsets
OffsetStoreBroker | Offsets are stored in Kafka broker (preferred) |
OffsetStoreFile FilePath OffsetStoreSync | Offsets are stored in a file (and synced to disk according to the sync policy) |
Instances
data OffsetStoreSync Source #
Indicates how offsets are to be synced to disk
OffsetSyncDisable | Do not sync offsets (in Kafka: -1) |
OffsetSyncImmediate | Sync immediately after each offset commit (in Kafka: 0) |
OffsetSyncInterval Int | Sync after specified interval in millis |
Instances
data OffsetCommit Source #
Offsets commit mode
OffsetCommit | Forces consumer to block until the broker offsets commit is done |
OffsetCommitAsync | Offsets will be committed in a non-blocking way |
Instances
Eq OffsetCommit Source # | |
Defined in Kafka.Consumer.Types (==) :: OffsetCommit -> OffsetCommit -> Bool # (/=) :: OffsetCommit -> OffsetCommit -> Bool # | |
Show OffsetCommit Source # | |
Defined in Kafka.Consumer.Types showsPrec :: Int -> OffsetCommit -> ShowS # show :: OffsetCommit -> String # showList :: [OffsetCommit] -> ShowS # | |
Generic OffsetCommit Source # | |
Defined in Kafka.Consumer.Types type Rep OffsetCommit :: Type -> Type # from :: OffsetCommit -> Rep OffsetCommit x # to :: Rep OffsetCommit x -> OffsetCommit # | |
type Rep OffsetCommit Source # | |
Consumer record timestamp
Instances
Eq Timestamp Source # | |
Read Timestamp Source # | |
Show Timestamp Source # | |
Generic Timestamp Source # | |
type Rep Timestamp Source # | |
Defined in Kafka.Consumer.Types type Rep Timestamp = D1 (MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-3.1.1-inplace" False) (C1 (MetaCons "CreateTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: (C1 (MetaCons "LogAppendTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: C1 (MetaCons "NoTimestamp" PrefixI False) (U1 :: Type -> Type))) |
data SubscribedPartitions Source #
Partitions subscribed by a consumer
SubscribedPartitions [PartitionId] | Subscribe only to those partitions |
SubscribedPartitionsAll | Subscribe to all partitions |
Instances
data PartitionOffset Source #
The partition offset
PartitionOffsetBeginning | |
PartitionOffsetEnd | |
PartitionOffset Int64 | |
PartitionOffsetStored | |
PartitionOffsetInvalid |
Instances
data RebalanceEvent Source #
A set of events which happen during the rebalancing process
RebalanceBeforeAssign [(TopicName, PartitionId)] | Happens before Kafka Client confirms new assignment |
RebalanceAssign [(TopicName, PartitionId)] | Happens after the new assignment is confirmed |
RebalanceBeforeRevoke [(TopicName, PartitionId)] | Happens before Kafka Client confirms partitions rejection |
RebalanceRevoke [(TopicName, PartitionId)] | Happens after the rejection is confirmed |
Instances
data OffsetReset Source #
Where to reset the offset when there is no initial offset in Kafka
Instances
Eq OffsetReset Source # | |
Defined in Kafka.Consumer.Types (==) :: OffsetReset -> OffsetReset -> Bool # (/=) :: OffsetReset -> OffsetReset -> Bool # | |
Show OffsetReset Source # | |
Defined in Kafka.Consumer.Types showsPrec :: Int -> OffsetReset -> ShowS # show :: OffsetReset -> String # showList :: [OffsetReset] -> ShowS # | |
Generic OffsetReset Source # | |
Defined in Kafka.Consumer.Types type Rep OffsetReset :: Type -> Type # from :: OffsetReset -> Rep OffsetReset x # to :: Rep OffsetReset x -> OffsetReset # | |
type Rep OffsetReset Source # | |
A message offset in a partition
newtype ConsumerGroupId Source #
Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic.
Instances
kcKafkaPtr :: KafkaConsumer -> Kafka Source #
kcKafkaConf :: KafkaConsumer -> KafkaConf Source #
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v Source #
Deprecated: Isn't concern of this library. Use first
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' Source #
Deprecated: Isn't concern of this library. Use second
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' Source #
Deprecated: Isn't concern of this library. Use bimap
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) Source #
Deprecated: Isn't concern of this library. Use bitraverse
id
pure
traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) Source #
Deprecated: Isn't concern of this library. Use bitraverse
f pure
traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) Source #
Deprecated: Isn't concern of this library. Use bitraverse
id
pure
<$>
bitraverse
f pure
r
traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) Source #
bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) Source #
data Subscription Source #
A consumer subscription to a topic.
Examples
Typically you don't call the constructor directly, but combine settings:
consumerSub ::Subscription
consumerSub =topics
[TopicName
"kafka-client-example-topic"] <>offsetReset
Earliest
<>extraSubscriptionProps
(fromList [("prop1", "value 1"), ("prop2", "value 2")])
Instances
Semigroup Subscription Source # | |
Defined in Kafka.Consumer.Subscription (<>) :: Subscription -> Subscription -> Subscription # sconcat :: NonEmpty Subscription -> Subscription # stimes :: Integral b => b -> Subscription -> Subscription # | |
Monoid Subscription Source # | |
Defined in Kafka.Consumer.Subscription mempty :: Subscription # mappend :: Subscription -> Subscription -> Subscription # mconcat :: [Subscription] -> Subscription # |
topics :: [TopicName] -> Subscription Source #
Build a subscription by giving the list of topic names only
offsetReset :: OffsetReset -> Subscription Source #
Build a subscription by giving the offset reset parameter only
extraSubscriptionProps :: Map Text Text -> Subscription Source #
Build a subscription by giving extra properties only
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #
Add a callback for errors.
Examples
Basic usage:
setCallback (errorCallback myErrorCallback) myErrorCallback :: KafkaError -> String -> IO () myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO () Source #
Add a callback for logs.
Examples
Basic usage:
setCallback (logCallback myLogCallback) myLogCallback :: KafkaLogLevel -> String -> String -> IO () myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO () Source #
Add a callback for stats.
Examples
Basic usage:
setCallback (statsCallback myStatsCallback) myStatsCallback :: String -> IO () myStatsCallback stats = print $ show stats
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
for this callback and is served by pollMessage
.
If no partitions had valid offsets to commit this callback will be called
with KafkaError
== KafkaResponseError
RdKafkaRespErrNoOffset
which is not to be considered
an error.
data ConsumerProperties Source #
Properties to create KafkaConsumer
.
ConsumerProperties | |
|
Instances
Semigroup ConsumerProperties Source # | |
Defined in Kafka.Consumer.ConsumerProperties (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Consumer.ConsumerProperties |
data CallbackPollMode Source #
Instances
Eq CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties (==) :: CallbackPollMode -> CallbackPollMode -> Bool # (/=) :: CallbackPollMode -> CallbackPollMode -> Bool # | |
Show CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties showsPrec :: Int -> CallbackPollMode -> ShowS # show :: CallbackPollMode -> String # showList :: [CallbackPollMode] -> ShowS # |
noAutoCommit :: ConsumerProperties Source #
Disables auto commit for the consumer
noAutoOffsetStore :: ConsumerProperties Source #
Disables auto offset store for the consumer
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Consumer group id
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Sets the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Sets the compression codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer disconnects logs.
It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.
extraProps :: Map Text Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
Sets debug features for the consumer.
Usually is used with consumerLogLevel
.
callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #
Sets the callback poll mode.
The default CallbackPollModeAsync
mode handles polling rebalance
and keep alive events for you
in a background thread.
With CallbacPollModeSync
the user will poll the consumer
frequently to handle new messages as well as rebalance and keep alive events.
CallbacPollModeSync
lets you can simplify
hw-kafka-client's footprint and have full control over when polling
happens at the cost of having to manage this yourself.
:: ConsumerProperties | |
-> Subscription | |
-> (KafkaConsumer -> IO (Either KafkaError a)) | A callback function to poll and handle messages |
-> IO (Either KafkaError a) |
Deprecated: Use 'newConsumer'/'closeConsumer' instead
Runs high-level kafka consumer.
A callback provided is expected to call pollMessage
when convenient.
newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #
Create a KafkaConsumer
. This consumer must be correctly released using closeConsumer
.
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Assigns the consumer to consume from the given topics, partitions, and offsets.
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #
Returns current consumer's assignment
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #
Returns current consumer's subscription
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Pauses specified partitions on the current consumer.
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Resumes specified partitions on the current consumer.
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve committed offsets for topics+partitions.
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer.
If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid
is returned.
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError) Source #
Seek a particular offset for each provided TopicPartition
:: MonadIO m | |
=> KafkaConsumer | |
-> Timeout | the timeout, in milliseconds |
-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) | Left on error or timeout, right for success |
Polls a single message
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #
Polls the provided kafka consumer for events.
Events will cause application provided callbacks to be called.
The Timeout
argument specifies the maximum amount of time
(in milliseconds) that the call will block waiting for events.
This function is called on each pollMessage
and, if runtime allows
multi threading, it is called periodically in a separate thread
to ensure the callbacks are handled ASAP.
There is no particular need to call this function manually
unless some special cases in a single-threaded environment
when polling for events on each pollMessage
is not
frequent enough.
pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))] Source #
Polls up to BatchSize
messages.
Unlike pollMessage
this function does not return usual "timeout" errors.
An empty batch is returned when there are no messages available.
This API is not available when userPolls
is set.
commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Commit message's offset on broker for the message's partition.
commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Stores offsets locally
storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Stores message's offset locally for the message's partition.
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #
Closes the consumer.
See newConsumer
data RdKafkaRespErrT Source #
Instances
Bounded RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka | |
Enum RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka succ :: RdKafkaRespErrT -> RdKafkaRespErrT # pred :: RdKafkaRespErrT -> RdKafkaRespErrT # toEnum :: Int -> RdKafkaRespErrT # fromEnum :: RdKafkaRespErrT -> Int # enumFrom :: RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThen :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThenTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # | |
Eq RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka (==) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # (/=) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # | |
Show RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka showsPrec :: Int -> RdKafkaRespErrT -> ShowS # show :: RdKafkaRespErrT -> String # showList :: [RdKafkaRespErrT] -> ShowS # |