Safe Haskell | None |
---|---|
Language | Haskell2010 |
Kafka.Consumer
Synopsis
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaError
- data KafkaLogLevel
- newtype Timeout = Timeout {}
- newtype BrokerAddress = BrokerAddress {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data TopicType
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- newtype ClientId = ClientId {
- unClientId :: Text
- newtype Millis = Millis {}
- newtype PartitionId = PartitionId {
- unPartitionId :: Int
- newtype BrokerId = BrokerId {
- unBrokerId :: Int
- topicType :: TopicName -> TopicType
- kafkaDebugToText :: KafkaDebug -> Text
- kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text
- data ConsumerRecord k v = ConsumerRecord {
- crTopic :: !TopicName
- crPartition :: !PartitionId
- crOffset :: !Offset
- crTimestamp :: !Timestamp
- crKey :: !k
- crValue :: !v
- data TopicPartition = TopicPartition {}
- data OffsetStoreMethod
- data OffsetStoreSync
- data OffsetCommit
- data Timestamp
- data SubscribedPartitions
- data PartitionOffset
- data RebalanceEvent
- data OffsetReset
- newtype Offset = Offset {}
- newtype ConsumerGroupId = ConsumerGroupId {}
- kcKafkaPtr :: KafkaConsumer -> Kafka
- kcKafkaConf :: KafkaConsumer -> KafkaConf
- crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
- crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
- crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
- sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
- traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v)
- traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v))
- traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v'))
- bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v'))
- data Subscription = Subscription (Set TopicName) (Map Text Text)
- topics :: [TopicName] -> Subscription
- offsetReset :: OffsetReset -> Subscription
- extraSubscriptionProps :: Map Text Text -> Subscription
- errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
- logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO ()
- statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO ()
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
- data ConsumerProperties = ConsumerProperties {
- cpProps :: Map Text Text
- cpLogLevel :: Maybe KafkaLogLevel
- cpCallbacks :: [KafkaConf -> IO ()]
- cpCallbackPollMode :: CallbackPollMode
- data CallbackPollMode
- brokersList :: [BrokerAddress] -> ConsumerProperties
- autoCommit :: Millis -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- callbackPollMode :: CallbackPollMode -> ConsumerProperties
- runConsumer :: ConsumerProperties -> Subscription -> (KafkaConsumer -> IO (Either KafkaError a)) -> IO (Either KafkaError a)
- newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer)
- assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId]))
- subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
- pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
- pollMessage :: MonadIO m => KafkaConsumer -> Timeout -> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
- pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
- commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
- commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
- data KafkaConsumer
- data RdKafkaRespErrT
- = RdKafkaRespErrBegin
- | RdKafkaRespErrBadMsg
- | RdKafkaRespErrBadCompression
- | RdKafkaRespErrDestroy
- | RdKafkaRespErrFail
- | RdKafkaRespErrTransport
- | RdKafkaRespErrCritSysResource
- | RdKafkaRespErrResolve
- | RdKafkaRespErrMsgTimedOut
- | RdKafkaRespErrPartitionEof
- | RdKafkaRespErrUnknownPartition
- | RdKafkaRespErrFs
- | RdKafkaRespErrUnknownTopic
- | RdKafkaRespErrAllBrokersDown
- | RdKafkaRespErrInvalidArg
- | RdKafkaRespErrTimedOut
- | RdKafkaRespErrQueueFull
- | RdKafkaRespErrIsrInsuff
- | RdKafkaRespErrNodeUpdate
- | RdKafkaRespErrSsl
- | RdKafkaRespErrWaitCoord
- | RdKafkaRespErrUnknownGroup
- | RdKafkaRespErrInProgress
- | RdKafkaRespErrPrevInProgress
- | RdKafkaRespErrExistingSubscription
- | RdKafkaRespErrAssignPartitions
- | RdKafkaRespErrRevokePartitions
- | RdKafkaRespErrConflict
- | RdKafkaRespErrState
- | RdKafkaRespErrUnknownProtocol
- | RdKafkaRespErrNotImplemented
- | RdKafkaRespErrAuthentication
- | RdKafkaRespErrNoOffset
- | RdKafkaRespErrOutdated
- | RdKafkaRespErrTimedOutQueue
- | RdKafkaRespErrUnsupportedFeature
- | RdKafkaRespErrWaitCache
- | RdKafkaRespErrIntr
- | RdKafkaRespErrKeySerialization
- | RdKafkaRespErrValueSerialization
- | RdKafkaRespErrKeyDeserialization
- | RdKafkaRespErrValueDeserialization
- | RdKafkaRespErrPartial
- | RdKafkaRespErrReadOnly
- | RdKafkaRespErrNoent
- | RdKafkaRespErrUnderflow
- | RdKafkaRespErrInvalidType
- | RdKafkaRespErrRetry
- | RdKafkaRespErrPurgeQueue
- | RdKafkaRespErrPurgeInflight
- | RdKafkaRespErrFatal
- | RdKafkaRespErrInconsistent
- | RdKafkaRespErrGaplessGuarantee
- | RdKafkaRespErrMaxPollExceeded
- | RdKafkaRespErrUnknownBroker
- | RdKafkaRespErrEnd
- | RdKafkaRespErrUnknown
- | RdKafkaRespErrNoError
- | RdKafkaRespErrOffsetOutOfRange
- | RdKafkaRespErrInvalidMsg
- | RdKafkaRespErrUnknownTopicOrPart
- | RdKafkaRespErrInvalidMsgSize
- | RdKafkaRespErrLeaderNotAvailable
- | RdKafkaRespErrNotLeaderForPartition
- | RdKafkaRespErrRequestTimedOut
- | RdKafkaRespErrBrokerNotAvailable
- | RdKafkaRespErrReplicaNotAvailable
- | RdKafkaRespErrMsgSizeTooLarge
- | RdKafkaRespErrStaleCtrlEpoch
- | RdKafkaRespErrOffsetMetadataTooLarge
- | RdKafkaRespErrNetworkException
- | RdKafkaRespErrCoordinatorLoadInProgress
- | RdKafkaRespErrCoordinatorNotAvailable
- | RdKafkaRespErrNotCoordinator
- | RdKafkaRespErrTopicException
- | RdKafkaRespErrRecordListTooLarge
- | RdKafkaRespErrNotEnoughReplicas
- | RdKafkaRespErrNotEnoughReplicasAfterAppend
- | RdKafkaRespErrInvalidRequiredAcks
- | RdKafkaRespErrIllegalGeneration
- | RdKafkaRespErrInconsistentGroupProtocol
- | RdKafkaRespErrInvalidGroupId
- | RdKafkaRespErrUnknownMemberId
- | RdKafkaRespErrInvalidSessionTimeout
- | RdKafkaRespErrRebalanceInProgress
- | RdKafkaRespErrInvalidCommitOffsetSize
- | RdKafkaRespErrTopicAuthorizationFailed
- | RdKafkaRespErrGroupAuthorizationFailed
- | RdKafkaRespErrClusterAuthorizationFailed
- | RdKafkaRespErrInvalidTimestamp
- | RdKafkaRespErrUnsupportedSaslMechanism
- | RdKafkaRespErrIllegalSaslState
- | RdKafkaRespErrUnsupportedVersion
- | RdKafkaRespErrTopicAlreadyExists
- | RdKafkaRespErrInvalidPartitions
- | RdKafkaRespErrInvalidReplicationFactor
- | RdKafkaRespErrInvalidReplicaAssignment
- | RdKafkaRespErrInvalidConfig
- | RdKafkaRespErrNotController
- | RdKafkaRespErrInvalidRequest
- | RdKafkaRespErrUnsupportedForMessageFormat
- | RdKafkaRespErrPolicyViolation
- | RdKafkaRespErrOutOfOrderSequenceNumber
- | RdKafkaRespErrDuplicateSequenceNumber
- | RdKafkaRespErrInvalidProducerEpoch
- | RdKafkaRespErrInvalidTxnState
- | RdKafkaRespErrInvalidProducerIdMapping
- | RdKafkaRespErrInvalidTransactionTimeout
- | RdKafkaRespErrConcurrentTransactions
- | RdKafkaRespErrTransactionCoordinatorFenced
- | RdKafkaRespErrTransactionalIdAuthorizationFailed
- | RdKafkaRespErrSecurityDisabled
- | RdKafkaRespErrOperationNotAttempted
- | RdKafkaRespErrKafkaStorageError
- | RdKafkaRespErrLogDirNotFound
- | RdKafkaRespErrSaslAuthenticationFailed
- | RdKafkaRespErrUnknownProducerId
- | RdKafkaRespErrReassignmentInProgress
- | RdKafkaRespErrDelegationTokenAuthDisabled
- | RdKafkaRespErrDelegationTokenNotFound
- | RdKafkaRespErrDelegationTokenOwnerMismatch
- | RdKafkaRespErrDelegationTokenRequestNotAllowed
- | RdKafkaRespErrDelegationTokenAuthorizationFailed
- | RdKafkaRespErrDelegationTokenExpired
- | RdKafkaRespErrInvalidPrincipalType
- | RdKafkaRespErrNonEmptyGroup
- | RdKafkaRespErrGroupIdNotFound
- | RdKafkaRespErrFetchSessionIdNotFound
- | RdKafkaRespErrInvalidFetchSessionEpoch
- | RdKafkaRespErrListenerNotFound
- | RdKafkaRespErrTopicDeletionDisabled
- | RdKafkaRespErrFencedLeaderEpoch
- | RdKafkaRespErrUnknownLeaderEpoch
- | RdKafkaRespErrUnsupportedCompressionType
- | RdKafkaRespErrStaleBrokerEpoch
- | RdKafkaRespErrOffsetNotAvailable
- | RdKafkaRespErrMemberIdRequired
- | RdKafkaRespErrPreferredLeaderNotAvailable
- | RdKafkaRespErrGroupMaxSizeReached
- | RdKafkaRespErrEndAll
Documentation
data KafkaCompressionCodec Source #
Constructors
NoCompression | |
Gzip | |
Snappy | |
Lz4 |
Instances
data KafkaDebug Source #
Constructors
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
Instances
data KafkaError Source #
Any Kafka errors
Constructors
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
Instances
data KafkaLogLevel Source #
Log levels for librdkafka.
Constructors
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Instances
Enum KafkaLogLevel Source # | |
Defined in Kafka.Types Methods succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel Source # | |
Defined in Kafka.Types Methods (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel Source # | |
Defined in Kafka.Types Methods showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |
Timeout in milliseconds
newtype BrokerAddress Source #
Kafka broker address string (e.g. broker1:9092
)
Constructors
BrokerAddress | |
Fields |
Instances
Eq BrokerAddress Source # | |
Defined in Kafka.Types Methods (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress Source # | |
Defined in Kafka.Types Methods showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
Generic BrokerAddress Source # | |
Defined in Kafka.Types Associated Types type Rep BrokerAddress :: Type -> Type # | |
type Rep BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-3.1.0-inplace" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) |
Topic name to be consumed
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^
will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
Constructors
TopicName | a simple topic name or a regex if started with |
Fields
|
Instances
Eq TopicName Source # | |
Ord TopicName Source # | |
Read TopicName Source # | |
Show TopicName Source # | |
Generic TopicName Source # | |
type Rep TopicName Source # | |
Defined in Kafka.Types |
Constructors
User | Normal topics that are created by user. |
System | Topics starting with "" (consumer_offsets, __confluent.support.metrics) are considered "system" topics |
Instances
Eq TopicType Source # | |
Ord TopicType Source # | |
Read TopicType Source # | |
Show TopicType Source # | |
Generic TopicType Source # | |
type Rep TopicType Source # | |
Constructors
BatchSize | |
Fields
|
Instances
Eq BatchSize Source # | |
Num BatchSize Source # | |
Ord BatchSize Source # | |
Read BatchSize Source # | |
Show BatchSize Source # | |
Generic BatchSize Source # | |
type Rep BatchSize Source # | |
Defined in Kafka.Types |
Constructors
ClientId | |
Fields
|
newtype PartitionId Source #
Constructors
PartitionId | |
Fields
|
Instances
Constructors
BrokerId | |
Fields
|
kafkaDebugToText :: KafkaDebug -> Text Source #
data ConsumerRecord k v Source #
Represents a received message from Kafka (i.e. used in a consumer)
Constructors
ConsumerRecord | |
Fields
|
Instances
data TopicPartition Source #
Kafka topic partition structure
Constructors
TopicPartition | |
Fields |
Instances
data OffsetStoreMethod Source #
Indicates the method of storing the offsets
Constructors
OffsetStoreBroker | Offsets are stored in Kafka broker (preferred) |
OffsetStoreFile FilePath OffsetStoreSync | Offsets are stored in a file (and synced to disk according to the sync policy) |
Instances
data OffsetStoreSync Source #
Indicates how offsets are to be synced to disk
Constructors
OffsetSyncDisable | Do not sync offsets (in Kafka: -1) |
OffsetSyncImmediate | Sync immediately after each offset commit (in Kafka: 0) |
OffsetSyncInterval Int | Sync after specified interval in millis |
Instances
data OffsetCommit Source #
Offsets commit mode
Constructors
OffsetCommit | Forces consumer to block until the broker offsets commit is done |
OffsetCommitAsync | Offsets will be committed in a non-blocking way |
Instances
Eq OffsetCommit Source # | |
Defined in Kafka.Consumer.Types | |
Show OffsetCommit Source # | |
Defined in Kafka.Consumer.Types Methods showsPrec :: Int -> OffsetCommit -> ShowS # show :: OffsetCommit -> String # showList :: [OffsetCommit] -> ShowS # | |
Generic OffsetCommit Source # | |
Defined in Kafka.Consumer.Types Associated Types type Rep OffsetCommit :: Type -> Type # | |
type Rep OffsetCommit Source # | |
Constructors
CreateTime !Millis | |
LogAppendTime !Millis | |
NoTimestamp |
Instances
Eq Timestamp Source # | |
Read Timestamp Source # | |
Show Timestamp Source # | |
Generic Timestamp Source # | |
type Rep Timestamp Source # | |
Defined in Kafka.Consumer.Types type Rep Timestamp = D1 (MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-3.1.0-inplace" False) (C1 (MetaCons "CreateTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: (C1 (MetaCons "LogAppendTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: C1 (MetaCons "NoTimestamp" PrefixI False) (U1 :: Type -> Type))) |
data SubscribedPartitions Source #
Constructors
SubscribedPartitions [PartitionId] | |
SubscribedPartitionsAll |
Instances
data PartitionOffset Source #
Constructors
PartitionOffsetBeginning | |
PartitionOffsetEnd | |
PartitionOffset Int64 | |
PartitionOffsetStored | |
PartitionOffsetInvalid |
Instances
data RebalanceEvent Source #
A set of events which happen during the rebalancing process
Constructors
RebalanceBeforeAssign [(TopicName, PartitionId)] | Happens before Kafka Client confirms new assignment |
RebalanceAssign [(TopicName, PartitionId)] | Happens after the new assignment is confirmed |
RebalanceBeforeRevoke [(TopicName, PartitionId)] | Happens before Kafka Client confirms partitions rejection |
RebalanceRevoke [(TopicName, PartitionId)] | Happens after the rejection is confirmed |
Instances
data OffsetReset Source #
Instances
Eq OffsetReset Source # | |
Defined in Kafka.Consumer.Types | |
Show OffsetReset Source # | |
Defined in Kafka.Consumer.Types Methods showsPrec :: Int -> OffsetReset -> ShowS # show :: OffsetReset -> String # showList :: [OffsetReset] -> ShowS # | |
Generic OffsetReset Source # | |
Defined in Kafka.Consumer.Types Associated Types type Rep OffsetReset :: Type -> Type # | |
type Rep OffsetReset Source # | |
newtype ConsumerGroupId Source #
Constructors
ConsumerGroupId | |
Fields |
Instances
kcKafkaPtr :: KafkaConsumer -> Kafka Source #
kcKafkaConf :: KafkaConsumer -> KafkaConf Source #
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v Source #
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' Source #
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' Source #
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) Source #
Deprecated: Isn't concern of this library. Use 'bitraverse id pure'
traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) Source #
Deprecated: Isn't concern of this library. Use 'bitraverse f pure'
traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) Source #
Deprecated: Isn't concern of this library. Use 'bitraverse id pure $ bitraverse f pure r'
traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) Source #
Deprecated: Isn't concern of this library. Use 'sequenceA $ traverse f r'
bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) Source #
Deprecated: Isn't concern of this library. Use 'bisequenceA $ bimapM f g r'
data Subscription Source #
Instances
Semigroup Subscription Source # | |
Defined in Kafka.Consumer.Subscription Methods (<>) :: Subscription -> Subscription -> Subscription # sconcat :: NonEmpty Subscription -> Subscription # stimes :: Integral b => b -> Subscription -> Subscription # | |
Monoid Subscription Source # | |
Defined in Kafka.Consumer.Subscription Methods mempty :: Subscription # mappend :: Subscription -> Subscription -> Subscription # mconcat :: [Subscription] -> Subscription # |
topics :: [TopicName] -> Subscription Source #
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #
Add a callback for errors.
Examples
Basic usage:
setCallback (errorCallback myErrorCallback) myErrorCallback :: KafkaError -> String -> IO () myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO () Source #
Add a callback for logs.
Examples
Basic usage:
setCallback (logCallback myLogCallback) myLogCallback :: KafkaLogLevel -> String -> String -> IO () myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO () Source #
Add a callback for stats.
Examples
Basic usage:
setCallback (statsCallback myStatsCallback) myStatsCallback :: String -> IO () myStatsCallback stats = print $ show stats
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
for this callback and is served by pollMessage
.
If no partitions had valid offsets to commit this callback will be called
with KafkaError
== KafkaResponseError
RdKafkaRespErrNoOffset
which is not to be considered
an error.
data ConsumerProperties Source #
Properties to create KafkaConsumer
.
Constructors
ConsumerProperties | |
Fields
|
Instances
Semigroup ConsumerProperties Source # | |
Defined in Kafka.Consumer.ConsumerProperties Methods (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Consumer.ConsumerProperties Methods mempty :: ConsumerProperties # mappend :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # mconcat :: [ConsumerProperties] -> ConsumerProperties # |
data CallbackPollMode Source #
Constructors
CallbackPollModeSync | |
CallbackPollModeAsync |
Instances
Eq CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties Methods (==) :: CallbackPollMode -> CallbackPollMode -> Bool # (/=) :: CallbackPollMode -> CallbackPollMode -> Bool # | |
Show CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties Methods showsPrec :: Int -> CallbackPollMode -> ShowS # show :: CallbackPollMode -> String # showList :: [CallbackPollMode] -> ShowS # |
noAutoCommit :: ConsumerProperties Source #
Disables auto commit for the consumer
noAutoOffsetStore :: ConsumerProperties Source #
Disables auto offset store for the consumer
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Consumer group id
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Sets the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Sets the compression codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer disconnects logs.
It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.
extraProps :: Map Text Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
Sets debug features for the consumer.
Usually is used with consumerLogLevel
.
callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #
Sets the callback poll mode.
The default CallbackPollModeAsync
mode handles polling rebalance
and keep alive events for you
in a background thread.
With CallbacPollModeSync
the user will poll the consumer
frequently to handle new messages as well as rebalance and keep alive events.
CallbacPollModeSync
lets you can simplify
hw-kafka-client's footprint and have full control over when polling
happens at the cost of having to manage this yourself.
Arguments
:: ConsumerProperties | |
-> Subscription | |
-> (KafkaConsumer -> IO (Either KafkaError a)) | A callback function to poll and handle messages |
-> IO (Either KafkaError a) |
Deprecated: Use newConsumer/closeConsumer instead
Runs high-level kafka consumer.
A callback provided is expected to call pollMessage
when convenient.
newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Assigns the consumer to consume from the given topics, partitions, and offsets.
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #
Returns current consumer's assignment
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #
Returns current consumer's subscription
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Pauses specified partitions on the current consumer.
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Resumes specified partitions on the current consumer.
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve committed offsets for topics+partitions.
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer.
If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid
is returned.
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError) Source #
Arguments
:: MonadIO m | |
=> KafkaConsumer | |
-> Timeout | the timeout, in milliseconds |
-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) | Left on error or timeout, right for success |
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #
Polls the provided kafka consumer for events.
Events will cause application provided callbacks to be called.
The p timeout_ms argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events.
This function is called on each pollMessage
and, if runtime allows
multi threading, it is called periodically in a separate thread
to ensure the callbacks are handled ASAP.
There is no particular need to call this function manually
unless some special cases in a single-threaded environment
when polling for events on each pollMessage
is not
frequent enough.
pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))] Source #
Polls up to BatchSize messages.
Unlike pollMessage
this function does not return usual "timeout" errors.
An empty batch is returned when there are no messages available.
This API is not available when userPolls
is set.
commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Commit message's offset on broker for the message's partition.
commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Stores offsets locally
storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Stores message's offset locally for the message's partition.
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #
Closes the consumer.
data KafkaConsumer Source #
data RdKafkaRespErrT Source #
Constructors
Instances
Bounded RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka | |
Enum RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka Methods succ :: RdKafkaRespErrT -> RdKafkaRespErrT # pred :: RdKafkaRespErrT -> RdKafkaRespErrT # toEnum :: Int -> RdKafkaRespErrT # fromEnum :: RdKafkaRespErrT -> Int # enumFrom :: RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThen :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThenTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # | |
Eq RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka Methods (==) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # (/=) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # | |
Show RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka Methods showsPrec :: Int -> RdKafkaRespErrT -> ShowS # show :: RdKafkaRespErrT -> String # showList :: [RdKafkaRespErrT] -> ShowS # |