Safe Haskell | None |
---|---|
Language | Haskell2010 |
Module to consume messages from Kafka topics.
Here's an example of code to consume messages from a topic:
import Control.Exception (bracket) import Control.Monad (replicateM_) import Kafka.Consumer -- Global consumer properties consumerProps ::ConsumerProperties
consumerProps =brokersList
["localhost:9092"] <>groupId
(ConsumerGroupId
"consumer_example_group") <>noAutoCommit
<>logLevel
KafkaLogInfo
-- Subscription to topics consumerSub ::Subscription
consumerSub =topics
[TopicName
"kafka-client-example-topic"] <>offsetReset
Earliest
-- Running an example runConsumerExample :: IO () runConsumerExample = do res <- bracket mkConsumer clConsumer runHandler print res where mkConsumer =newConsumer
consumerProps consumerSub clConsumer (Left err) = pure (Left err) clConsumer (Right kc) = (maybe (Right ()) Left) <$>closeConsumer
kc runHandler (Left err) = pure (Left err) runHandler (Right kc) = processMessages kc -- Example polling 10 times before stopping processMessages ::KafkaConsumer
-> IO (EitherKafkaError
()) processMessages kafka = do replicateM_ 10 $ do msg <-pollMessage
kafka (Timeout
1000) putStrLn $ "Message: " <> show msg err <-commitAllOffsets
OffsetCommit
kafka putStrLn $ "Offsets: " <> maybe "Committed." show err pure $ Right ()
Synopsis
- data KafkaConsumer
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaError
- data KafkaLogLevel
- newtype Timeout = Timeout {}
- newtype BrokerAddress = BrokerAddress {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data TopicType
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- newtype ClientId = ClientId {
- unClientId :: Text
- newtype Millis = Millis {}
- newtype PartitionId = PartitionId {
- unPartitionId :: Int
- newtype BrokerId = BrokerId {
- unBrokerId :: Int
- topicType :: TopicName -> TopicType
- kafkaDebugToText :: KafkaDebug -> Text
- kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text
- data Callback
- data ConsumerRecord k v = ConsumerRecord {
- crTopic :: !TopicName
- crPartition :: !PartitionId
- crOffset :: !Offset
- crTimestamp :: !Timestamp
- crKey :: !k
- crValue :: !v
- data TopicPartition = TopicPartition {}
- data OffsetStoreMethod
- data OffsetStoreSync
- data OffsetCommit
- data Timestamp
- data SubscribedPartitions
- data PartitionOffset
- data RebalanceEvent
- data OffsetReset
- newtype Offset = Offset {}
- newtype ConsumerGroupId = ConsumerGroupId {}
- kcKafkaPtr :: KafkaConsumer -> Kafka
- kcKafkaConf :: KafkaConsumer -> KafkaConf
- crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
- crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
- crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
- sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
- traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v)
- traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v))
- traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v'))
- bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v'))
- data Subscription = Subscription (Set TopicName) (Map Text Text)
- topics :: [TopicName] -> Subscription
- offsetReset :: OffsetReset -> Subscription
- extraSubscriptionProps :: Map Text Text -> Subscription
- errorCallback :: (KafkaError -> String -> IO ()) -> Callback
- logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
- statsCallback :: (ByteString -> IO ()) -> Callback
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback
- data ConsumerProperties = ConsumerProperties {}
- data CallbackPollMode
- brokersList :: [BrokerAddress] -> ConsumerProperties
- autoCommit :: Millis -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: Callback -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- statisticsInterval :: Millis -> ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- callbackPollMode :: CallbackPollMode -> ConsumerProperties
- runConsumer :: ConsumerProperties -> Subscription -> (KafkaConsumer -> IO (Either KafkaError a)) -> IO (Either KafkaError a)
- newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer)
- assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId]))
- subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
- pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
- pollMessage :: MonadIO m => KafkaConsumer -> Timeout -> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
- pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
- commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
- commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
- data RdKafkaRespErrT
- = RdKafkaRespErrBegin
- | RdKafkaRespErrBadMsg
- | RdKafkaRespErrBadCompression
- | RdKafkaRespErrDestroy
- | RdKafkaRespErrFail
- | RdKafkaRespErrTransport
- | RdKafkaRespErrCritSysResource
- | RdKafkaRespErrResolve
- | RdKafkaRespErrMsgTimedOut
- | RdKafkaRespErrPartitionEof
- | RdKafkaRespErrUnknownPartition
- | RdKafkaRespErrFs
- | RdKafkaRespErrUnknownTopic
- | RdKafkaRespErrAllBrokersDown
- | RdKafkaRespErrInvalidArg
- | RdKafkaRespErrTimedOut
- | RdKafkaRespErrQueueFull
- | RdKafkaRespErrIsrInsuff
- | RdKafkaRespErrNodeUpdate
- | RdKafkaRespErrSsl
- | RdKafkaRespErrWaitCoord
- | RdKafkaRespErrUnknownGroup
- | RdKafkaRespErrInProgress
- | RdKafkaRespErrPrevInProgress
- | RdKafkaRespErrExistingSubscription
- | RdKafkaRespErrAssignPartitions
- | RdKafkaRespErrRevokePartitions
- | RdKafkaRespErrConflict
- | RdKafkaRespErrState
- | RdKafkaRespErrUnknownProtocol
- | RdKafkaRespErrNotImplemented
- | RdKafkaRespErrAuthentication
- | RdKafkaRespErrNoOffset
- | RdKafkaRespErrOutdated
- | RdKafkaRespErrTimedOutQueue
- | RdKafkaRespErrUnsupportedFeature
- | RdKafkaRespErrWaitCache
- | RdKafkaRespErrIntr
- | RdKafkaRespErrKeySerialization
- | RdKafkaRespErrValueSerialization
- | RdKafkaRespErrKeyDeserialization
- | RdKafkaRespErrValueDeserialization
- | RdKafkaRespErrPartial
- | RdKafkaRespErrReadOnly
- | RdKafkaRespErrNoent
- | RdKafkaRespErrUnderflow
- | RdKafkaRespErrInvalidType
- | RdKafkaRespErrRetry
- | RdKafkaRespErrPurgeQueue
- | RdKafkaRespErrPurgeInflight
- | RdKafkaRespErrFatal
- | RdKafkaRespErrInconsistent
- | RdKafkaRespErrGaplessGuarantee
- | RdKafkaRespErrMaxPollExceeded
- | RdKafkaRespErrUnknownBroker
- | RdKafkaRespErrNotConfigured
- | RdKafkaRespErrFenced
- | RdKafkaRespErrApplication
- | RdKafkaRespErrEnd
- | RdKafkaRespErrUnknown
- | RdKafkaRespErrNoError
- | RdKafkaRespErrOffsetOutOfRange
- | RdKafkaRespErrInvalidMsg
- | RdKafkaRespErrUnknownTopicOrPart
- | RdKafkaRespErrInvalidMsgSize
- | RdKafkaRespErrLeaderNotAvailable
- | RdKafkaRespErrNotLeaderForPartition
- | RdKafkaRespErrRequestTimedOut
- | RdKafkaRespErrBrokerNotAvailable
- | RdKafkaRespErrReplicaNotAvailable
- | RdKafkaRespErrMsgSizeTooLarge
- | RdKafkaRespErrStaleCtrlEpoch
- | RdKafkaRespErrOffsetMetadataTooLarge
- | RdKafkaRespErrNetworkException
- | RdKafkaRespErrCoordinatorLoadInProgress
- | RdKafkaRespErrCoordinatorNotAvailable
- | RdKafkaRespErrNotCoordinator
- | RdKafkaRespErrTopicException
- | RdKafkaRespErrRecordListTooLarge
- | RdKafkaRespErrNotEnoughReplicas
- | RdKafkaRespErrNotEnoughReplicasAfterAppend
- | RdKafkaRespErrInvalidRequiredAcks
- | RdKafkaRespErrIllegalGeneration
- | RdKafkaRespErrInconsistentGroupProtocol
- | RdKafkaRespErrInvalidGroupId
- | RdKafkaRespErrUnknownMemberId
- | RdKafkaRespErrInvalidSessionTimeout
- | RdKafkaRespErrRebalanceInProgress
- | RdKafkaRespErrInvalidCommitOffsetSize
- | RdKafkaRespErrTopicAuthorizationFailed
- | RdKafkaRespErrGroupAuthorizationFailed
- | RdKafkaRespErrClusterAuthorizationFailed
- | RdKafkaRespErrInvalidTimestamp
- | RdKafkaRespErrUnsupportedSaslMechanism
- | RdKafkaRespErrIllegalSaslState
- | RdKafkaRespErrUnsupportedVersion
- | RdKafkaRespErrTopicAlreadyExists
- | RdKafkaRespErrInvalidPartitions
- | RdKafkaRespErrInvalidReplicationFactor
- | RdKafkaRespErrInvalidReplicaAssignment
- | RdKafkaRespErrInvalidConfig
- | RdKafkaRespErrNotController
- | RdKafkaRespErrInvalidRequest
- | RdKafkaRespErrUnsupportedForMessageFormat
- | RdKafkaRespErrPolicyViolation
- | RdKafkaRespErrOutOfOrderSequenceNumber
- | RdKafkaRespErrDuplicateSequenceNumber
- | RdKafkaRespErrInvalidProducerEpoch
- | RdKafkaRespErrInvalidTxnState
- | RdKafkaRespErrInvalidProducerIdMapping
- | RdKafkaRespErrInvalidTransactionTimeout
- | RdKafkaRespErrConcurrentTransactions
- | RdKafkaRespErrTransactionCoordinatorFenced
- | RdKafkaRespErrTransactionalIdAuthorizationFailed
- | RdKafkaRespErrSecurityDisabled
- | RdKafkaRespErrOperationNotAttempted
- | RdKafkaRespErrKafkaStorageError
- | RdKafkaRespErrLogDirNotFound
- | RdKafkaRespErrSaslAuthenticationFailed
- | RdKafkaRespErrUnknownProducerId
- | RdKafkaRespErrReassignmentInProgress
- | RdKafkaRespErrDelegationTokenAuthDisabled
- | RdKafkaRespErrDelegationTokenNotFound
- | RdKafkaRespErrDelegationTokenOwnerMismatch
- | RdKafkaRespErrDelegationTokenRequestNotAllowed
- | RdKafkaRespErrDelegationTokenAuthorizationFailed
- | RdKafkaRespErrDelegationTokenExpired
- | RdKafkaRespErrInvalidPrincipalType
- | RdKafkaRespErrNonEmptyGroup
- | RdKafkaRespErrGroupIdNotFound
- | RdKafkaRespErrFetchSessionIdNotFound
- | RdKafkaRespErrInvalidFetchSessionEpoch
- | RdKafkaRespErrListenerNotFound
- | RdKafkaRespErrTopicDeletionDisabled
- | RdKafkaRespErrFencedLeaderEpoch
- | RdKafkaRespErrUnknownLeaderEpoch
- | RdKafkaRespErrUnsupportedCompressionType
- | RdKafkaRespErrStaleBrokerEpoch
- | RdKafkaRespErrOffsetNotAvailable
- | RdKafkaRespErrMemberIdRequired
- | RdKafkaRespErrPreferredLeaderNotAvailable
- | RdKafkaRespErrGroupMaxSizeReached
- | RdKafkaRespErrFencedInstanceId
- | RdKafkaRespErrEndAll
Documentation
data KafkaConsumer Source #
The main type for Kafka consumption, used e.g. to poll and commit messages.
Its constructor is intentionally not exposed, instead, one should use newConsumer
to acquire such a value.
data KafkaCompressionCodec Source #
Compression codec used by a topic
Instances
Eq KafkaCompressionCodec Source # | |
Defined in Kafka.Types (==) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # (/=) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # | |
Show KafkaCompressionCodec Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaCompressionCodec -> ShowS # show :: KafkaCompressionCodec -> String # showList :: [KafkaCompressionCodec] -> ShowS # | |
Generic KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec :: Type -> Type # | |
type Rep KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec = D1 (MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-4.0.1-inplace" False) ((C1 (MetaCons "NoCompression" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Gzip" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Snappy" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Lz4" PrefixI False) (U1 :: Type -> Type))) |
data KafkaDebug Source #
Available librdkafka debug contexts
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
Instances
data KafkaError Source #
All possible Kafka errors
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
Instances
data KafkaLogLevel Source #
Log levels for librdkafka.
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Instances
Enum KafkaLogLevel Source # | |
Defined in Kafka.Types succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel Source # | |
Defined in Kafka.Types (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |
Timeout in milliseconds
newtype BrokerAddress Source #
Kafka broker address string (e.g. broker1:9092
)
Instances
Eq BrokerAddress Source # | |
Defined in Kafka.Types (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress Source # | |
Defined in Kafka.Types showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
IsString BrokerAddress Source # | |
Defined in Kafka.Types fromString :: String -> BrokerAddress # | |
Generic BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress :: Type -> Type # from :: BrokerAddress -> Rep BrokerAddress x # to :: Rep BrokerAddress x -> BrokerAddress # | |
type Rep BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-4.0.1-inplace" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) |
Topic name to consume/produce messages
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^
will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
TopicName | |
|
Whether the topic is created by a user or by the system
User | Normal topics that are created by user. |
System | Topics starting with a double underscore "__" ( |
Batch size used for polling
Client ID used by Kafka to better track requests
A number of milliseconds, used to represent durations and timestamps
newtype PartitionId Source #
Topic partition ID
Instances
Kafka broker ID
topicType :: TopicName -> TopicType Source #
Deduce the type of a topic from its name, by checking if it starts with a double underscore "__"
kafkaDebugToText :: KafkaDebug -> Text Source #
Convert a KafkaDebug
into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text Source #
Convert a KafkaCompressionCodec
into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
Callbacks allow retrieving various information like error occurences, statistics
and log messages.
See setCallback
(Consumer) and setCallback
(Producer) for more details.
data ConsumerRecord k v Source #
Represents a received message from Kafka (i.e. used in a consumer)
ConsumerRecord | |
|
Instances
data TopicPartition Source #
Kafka topic partition structure
Instances
data OffsetStoreMethod Source #
Indicates the method of storing the offsets
OffsetStoreBroker | Offsets are stored in Kafka broker (preferred) |
OffsetStoreFile FilePath OffsetStoreSync | Offsets are stored in a file (and synced to disk according to the sync policy) |
Instances
data OffsetStoreSync Source #
Indicates how offsets are to be synced to disk
OffsetSyncDisable | Do not sync offsets (in Kafka: -1) |
OffsetSyncImmediate | Sync immediately after each offset commit (in Kafka: 0) |
OffsetSyncInterval Int | Sync after specified interval in millis |
Instances
data OffsetCommit Source #
Offsets commit mode
OffsetCommit | Forces consumer to block until the broker offsets commit is done |
OffsetCommitAsync | Offsets will be committed in a non-blocking way |
Instances
Eq OffsetCommit Source # | |
Defined in Kafka.Consumer.Types (==) :: OffsetCommit -> OffsetCommit -> Bool # (/=) :: OffsetCommit -> OffsetCommit -> Bool # | |
Show OffsetCommit Source # | |
Defined in Kafka.Consumer.Types showsPrec :: Int -> OffsetCommit -> ShowS # show :: OffsetCommit -> String # showList :: [OffsetCommit] -> ShowS # | |
Generic OffsetCommit Source # | |
Defined in Kafka.Consumer.Types type Rep OffsetCommit :: Type -> Type # from :: OffsetCommit -> Rep OffsetCommit x # to :: Rep OffsetCommit x -> OffsetCommit # | |
type Rep OffsetCommit Source # | |
Consumer record timestamp
Instances
Eq Timestamp Source # | |
Read Timestamp Source # | |
Show Timestamp Source # | |
Generic Timestamp Source # | |
type Rep Timestamp Source # | |
Defined in Kafka.Consumer.Types type Rep Timestamp = D1 (MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-4.0.1-inplace" False) (C1 (MetaCons "CreateTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: (C1 (MetaCons "LogAppendTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: C1 (MetaCons "NoTimestamp" PrefixI False) (U1 :: Type -> Type))) |
data SubscribedPartitions Source #
Partitions subscribed by a consumer
SubscribedPartitions [PartitionId] | Subscribe only to those partitions |
SubscribedPartitionsAll | Subscribe to all partitions |
Instances
data PartitionOffset Source #
The partition offset
PartitionOffsetBeginning | |
PartitionOffsetEnd | |
PartitionOffset Int64 | |
PartitionOffsetStored | |
PartitionOffsetInvalid |
Instances
data RebalanceEvent Source #
A set of events which happen during the rebalancing process
RebalanceBeforeAssign [(TopicName, PartitionId)] | Happens before Kafka Client confirms new assignment |
RebalanceAssign [(TopicName, PartitionId)] | Happens after the new assignment is confirmed |
RebalanceBeforeRevoke [(TopicName, PartitionId)] | Happens before Kafka Client confirms partitions rejection |
RebalanceRevoke [(TopicName, PartitionId)] | Happens after the rejection is confirmed |
Instances
data OffsetReset Source #
Where to reset the offset when there is no initial offset in Kafka
Instances
Eq OffsetReset Source # | |
Defined in Kafka.Consumer.Types (==) :: OffsetReset -> OffsetReset -> Bool # (/=) :: OffsetReset -> OffsetReset -> Bool # | |
Show OffsetReset Source # | |
Defined in Kafka.Consumer.Types showsPrec :: Int -> OffsetReset -> ShowS # show :: OffsetReset -> String # showList :: [OffsetReset] -> ShowS # | |
Generic OffsetReset Source # | |
Defined in Kafka.Consumer.Types type Rep OffsetReset :: Type -> Type # from :: OffsetReset -> Rep OffsetReset x # to :: Rep OffsetReset x -> OffsetReset # | |
type Rep OffsetReset Source # | |
A message offset in a partition
newtype ConsumerGroupId Source #
Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic.
Instances
kcKafkaPtr :: KafkaConsumer -> Kafka Source #
kcKafkaConf :: KafkaConsumer -> KafkaConf Source #
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v Source #
Deprecated: Isn't concern of this library. Use first
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' Source #
Deprecated: Isn't concern of this library. Use second
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' Source #
Deprecated: Isn't concern of this library. Use bimap
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) Source #
Deprecated: Isn't concern of this library. Use bitraverse
id
pure
traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) Source #
Deprecated: Isn't concern of this library. Use bitraverse
f pure
traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) Source #
Deprecated: Isn't concern of this library. Use bitraverse
id
pure
<$>
bitraverse
f pure
r
traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) Source #
bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) Source #
Deprecated: Isn't concern of this library. Use bisequenceA
<$>
bimapM
f g r
data Subscription Source #
A consumer subscription to a topic.
Examples
Typically you don't call the constructor directly, but combine settings:
consumerSub ::Subscription
consumerSub =topics
[TopicName
"kafka-client-example-topic"] <>offsetReset
Earliest
<>extraSubscriptionProps
(fromList [("prop1", "value 1"), ("prop2", "value 2")])
Instances
Semigroup Subscription Source # | |
Defined in Kafka.Consumer.Subscription (<>) :: Subscription -> Subscription -> Subscription # sconcat :: NonEmpty Subscription -> Subscription # stimes :: Integral b => b -> Subscription -> Subscription # | |
Monoid Subscription Source # | |
Defined in Kafka.Consumer.Subscription mempty :: Subscription # mappend :: Subscription -> Subscription -> Subscription # mconcat :: [Subscription] -> Subscription # |
topics :: [TopicName] -> Subscription Source #
Build a subscription by giving the list of topic names only
offsetReset :: OffsetReset -> Subscription Source #
Build a subscription by giving the offset reset parameter only
extraSubscriptionProps :: Map Text Text -> Subscription Source #
Build a subscription by giving extra properties only
errorCallback :: (KafkaError -> String -> IO ()) -> Callback Source #
Add a callback for errors.
Examples
Basic usage:
'setCallback' ('errorCallback' myErrorCallback) myErrorCallback :: 'KafkaError' -> String -> IO () myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback Source #
Add a callback for logs.
Examples
Basic usage:
'setCallback' ('logCallback' myLogCallback) myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO () myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
statsCallback :: (ByteString -> IO ()) -> Callback Source #
Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md.
Examples
Basic usage:
'setCallback' ('statsCallback' myStatsCallback) myStatsCallback :: String -> IO () myStatsCallback stats = print $ show stats
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback Source #
Sets a callback that is called when rebalance is needed.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
for this callback and is served by pollMessage
.
If no partitions had valid offsets to commit this callback will be called
with KafkaResponseError
RdKafkaRespErrNoOffset
which is not to be considered
an error.
data ConsumerProperties Source #
Properties to create KafkaConsumer
.
Instances
Semigroup ConsumerProperties Source # | |
Defined in Kafka.Consumer.ConsumerProperties (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Consumer.ConsumerProperties |
data CallbackPollMode Source #
Whether the callback polling should be done synchronously or not.
CallbackPollModeSync | You have to poll the consumer frequently to handle new messages as well as rebalance and keep alive events. This enables lowering the footprint and having full control over when polling happens, at the cost of manually managing those events. |
CallbackPollModeAsync | Handle polling rebalance and keep alive events for you in a background thread. |
Instances
Eq CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties (==) :: CallbackPollMode -> CallbackPollMode -> Bool # (/=) :: CallbackPollMode -> CallbackPollMode -> Bool # | |
Show CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties showsPrec :: Int -> CallbackPollMode -> ShowS # show :: CallbackPollMode -> String # showList :: [CallbackPollMode] -> ShowS # |
brokersList :: [BrokerAddress] -> ConsumerProperties Source #
Set the list of brokers to contact to connect to the Kafka cluster.
autoCommit :: Millis -> ConsumerProperties Source #
Set the auto commit interval and enables auto commit.
noAutoCommit :: ConsumerProperties Source #
Disable auto commit for the consumer.
noAutoOffsetStore :: ConsumerProperties Source #
Disable auto offset store for the consumer.
See enable.auto.offset.store for more information.
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Set the consumer group id.
clientId :: ClientId -> ConsumerProperties Source #
Set the consumer identifier.
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Set the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Set the compression.codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer log.connection.close.
It might be useful to turn this off when interacting with brokers
with an aggressive connection.max.idle.ms
value.
statisticsInterval :: Millis -> ConsumerProperties Source #
Set the statistics.interval.ms for the producer.
extraProps :: Map Text Text -> ConsumerProperties Source #
Set any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Set any configuration option that is supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #
Set the callback poll mode. Default value is CallbackPollModeAsync
.
:: ConsumerProperties | |
-> Subscription | |
-> (KafkaConsumer -> IO (Either KafkaError a)) | A callback function to poll and handle messages |
-> IO (Either KafkaError a) |
Deprecated: Use 'newConsumer'/'closeConsumer' instead
Runs high-level kafka consumer.
A callback provided is expected to call pollMessage
when convenient.
newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #
Create a KafkaConsumer
. This consumer must be correctly released using closeConsumer
.
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Assigns the consumer to consume from the given topics, partitions, and offsets.
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #
Returns current consumer's assignment
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #
Returns current consumer's subscription
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Pauses specified partitions on the current consumer.
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Resumes specified partitions on the current consumer.
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve committed offsets for topics+partitions.
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer.
If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid
is returned.
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError) Source #
Seek a particular offset for each provided TopicPartition
:: MonadIO m | |
=> KafkaConsumer | |
-> Timeout | the timeout, in milliseconds |
-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) | Left on error or timeout, right for success |
Polls a single message
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #
Polls the provided kafka consumer for events.
Events will cause application provided callbacks to be called.
The Timeout
argument specifies the maximum amount of time
(in milliseconds) that the call will block waiting for events.
This function is called on each pollMessage
and, if runtime allows
multi threading, it is called periodically in a separate thread
to ensure the callbacks are handled ASAP.
There is no particular need to call this function manually
unless some special cases in a single-threaded environment
when polling for events on each pollMessage
is not
frequent enough.
pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))] Source #
Polls up to BatchSize
messages.
Unlike pollMessage
this function does not return usual "timeout" errors.
An empty batch is returned when there are no messages available.
This API is not available when CallbackPollMode
is set to CallbackPollModeSync
.
commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Commit message's offset on broker for the message's partition.
commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Stores offsets locally
storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Stores message's offset locally for the message's partition.
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #
Closes the consumer.
See newConsumer
data RdKafkaRespErrT Source #
Instances
Bounded RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka | |
Enum RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka succ :: RdKafkaRespErrT -> RdKafkaRespErrT # pred :: RdKafkaRespErrT -> RdKafkaRespErrT # toEnum :: Int -> RdKafkaRespErrT # fromEnum :: RdKafkaRespErrT -> Int # enumFrom :: RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThen :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThenTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # | |
Eq RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka (==) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # (/=) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # | |
Show RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka showsPrec :: Int -> RdKafkaRespErrT -> ShowS # show :: RdKafkaRespErrT -> String # showList :: [RdKafkaRespErrT] -> ShowS # |