Safe Haskell | None |
Language | Haskell2010 |
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaError
- data KafkaLogLevel
- newtype Timeout = Timeout {}
- newtype BrokerAddress = BrokerAddress {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data TopicType
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- newtype ClientId = ClientId {
- unClientId :: Text
- newtype Millis = Millis {}
- newtype PartitionId = PartitionId {
- unPartitionId :: Int
- newtype BrokerId = BrokerId {
- unBrokerId :: Int
- topicType :: TopicName -> TopicType
- kafkaDebugToText :: KafkaDebug -> Text
- kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text
- data ConsumerRecord k v = ConsumerRecord {
- crTopic :: !TopicName
- crPartition :: !PartitionId
- crOffset :: !Offset
- crTimestamp :: !Timestamp
- crKey :: !k
- crValue :: !v
- data TopicPartition = TopicPartition {}
- data OffsetStoreMethod
- data OffsetStoreSync
- data OffsetCommit
- data Timestamp
- data SubscribedPartitions
- data PartitionOffset
- data RebalanceEvent
- data OffsetReset
- newtype Offset = Offset {}
- newtype ConsumerGroupId = ConsumerGroupId {}
- kcKafkaPtr :: KafkaConsumer -> Kafka
- kcKafkaConf :: KafkaConsumer -> KafkaConf
- crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
- crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
- crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
- sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
- traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v)
- traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v))
- traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v'))
- bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v'))
- data Subscription = Subscription (Set TopicName) (Map Text Text)
- topics :: [TopicName] -> Subscription
- offsetReset :: OffsetReset -> Subscription
- extraSubscriptionProps :: Map Text Text -> Subscription
- errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
- logCallback :: HasKafkaConf k => (Int -> String -> String -> IO ()) -> k -> IO ()
- statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO ()
- rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
- offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
- data ConsumerProperties = ConsumerProperties {
- cpProps :: Map Text Text
- cpLogLevel :: Maybe KafkaLogLevel
- cpCallbacks :: [KafkaConf -> IO ()]
- cpCallbackPollMode :: CallbackPollMode
- data CallbackPollMode
- brokersList :: [BrokerAddress] -> ConsumerProperties
- autoCommit :: Millis -> ConsumerProperties
- noAutoCommit :: ConsumerProperties
- noAutoOffsetStore :: ConsumerProperties
- groupId :: ConsumerGroupId -> ConsumerProperties
- clientId :: ClientId -> ConsumerProperties
- setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
- logLevel :: KafkaLogLevel -> ConsumerProperties
- compression :: KafkaCompressionCodec -> ConsumerProperties
- suppressDisconnectLogs :: ConsumerProperties
- extraProps :: Map Text Text -> ConsumerProperties
- extraProp :: Text -> Text -> ConsumerProperties
- debugOptions :: [KafkaDebug] -> ConsumerProperties
- queuedMaxMessagesKBytes :: Int -> ConsumerProperties
- callbackPollMode :: CallbackPollMode -> ConsumerProperties
- runConsumer :: ConsumerProperties -> Subscription -> (KafkaConsumer -> IO (Either KafkaError a)) -> IO (Either KafkaError a)
- newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer)
- assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId]))
- subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
- pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
- pollMessage :: MonadIO m => KafkaConsumer -> Timeout -> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
- pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
- commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
- commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
- data KafkaConsumer
- data RdKafkaRespErrT
- = RdKafkaRespErrBegin
- | RdKafkaRespErrBadMsg
- | RdKafkaRespErrBadCompression
- | RdKafkaRespErrDestroy
- | RdKafkaRespErrFail
- | RdKafkaRespErrTransport
- | RdKafkaRespErrCritSysResource
- | RdKafkaRespErrResolve
- | RdKafkaRespErrMsgTimedOut
- | RdKafkaRespErrPartitionEof
- | RdKafkaRespErrUnknownPartition
- | RdKafkaRespErrFs
- | RdKafkaRespErrUnknownTopic
- | RdKafkaRespErrAllBrokersDown
- | RdKafkaRespErrInvalidArg
- | RdKafkaRespErrTimedOut
- | RdKafkaRespErrQueueFull
- | RdKafkaRespErrIsrInsuff
- | RdKafkaRespErrNodeUpdate
- | RdKafkaRespErrSsl
- | RdKafkaRespErrWaitCoord
- | RdKafkaRespErrUnknownGroup
- | RdKafkaRespErrInProgress
- | RdKafkaRespErrPrevInProgress
- | RdKafkaRespErrExistingSubscription
- | RdKafkaRespErrAssignPartitions
- | RdKafkaRespErrRevokePartitions
- | RdKafkaRespErrConflict
- | RdKafkaRespErrState
- | RdKafkaRespErrUnknownProtocol
- | RdKafkaRespErrNotImplemented
- | RdKafkaRespErrAuthentication
- | RdKafkaRespErrNoOffset
- | RdKafkaRespErrOutdated
- | RdKafkaRespErrTimedOutQueue
- | RdKafkaRespErrUnsupportedFeature
- | RdKafkaRespErrWaitCache
- | RdKafkaRespErrIntr
- | RdKafkaRespErrKeySerialization
- | RdKafkaRespErrValueSerialization
- | RdKafkaRespErrKeyDeserialization
- | RdKafkaRespErrValueDeserialization
- | RdKafkaRespErrPartial
- | RdKafkaRespErrReadOnly
- | RdKafkaRespErrNoent
- | RdKafkaRespErrUnderflow
- | RdKafkaRespErrInvalidType
- | RdKafkaRespErrRetry
- | RdKafkaRespErrPurgeQueue
- | RdKafkaRespErrPurgeInflight
- | RdKafkaRespErrFatal
- | RdKafkaRespErrInconsistent
- | RdKafkaRespErrGaplessGuarantee
- | RdKafkaRespErrMaxPollExceeded
- | RdKafkaRespErrEnd
- | RdKafkaRespErrUnknown
- | RdKafkaRespErrNoError
- | RdKafkaRespErrOffsetOutOfRange
- | RdKafkaRespErrInvalidMsg
- | RdKafkaRespErrUnknownTopicOrPart
- | RdKafkaRespErrInvalidMsgSize
- | RdKafkaRespErrLeaderNotAvailable
- | RdKafkaRespErrNotLeaderForPartition
- | RdKafkaRespErrRequestTimedOut
- | RdKafkaRespErrBrokerNotAvailable
- | RdKafkaRespErrReplicaNotAvailable
- | RdKafkaRespErrMsgSizeTooLarge
- | RdKafkaRespErrStaleCtrlEpoch
- | RdKafkaRespErrOffsetMetadataTooLarge
- | RdKafkaRespErrNetworkException
- | RdKafkaRespErrGroupLoadInProgress
- | RdKafkaRespErrGroupCoordinatorNotAvailable
- | RdKafkaRespErrNotCoordinatorForGroup
- | RdKafkaRespErrTopicException
- | RdKafkaRespErrRecordListTooLarge
- | RdKafkaRespErrNotEnoughReplicas
- | RdKafkaRespErrNotEnoughReplicasAfterAppend
- | RdKafkaRespErrInvalidRequiredAcks
- | RdKafkaRespErrIllegalGeneration
- | RdKafkaRespErrInconsistentGroupProtocol
- | RdKafkaRespErrInvalidGroupId
- | RdKafkaRespErrUnknownMemberId
- | RdKafkaRespErrInvalidSessionTimeout
- | RdKafkaRespErrRebalanceInProgress
- | RdKafkaRespErrInvalidCommitOffsetSize
- | RdKafkaRespErrTopicAuthorizationFailed
- | RdKafkaRespErrGroupAuthorizationFailed
- | RdKafkaRespErrClusterAuthorizationFailed
- | RdKafkaRespErrInvalidTimestamp
- | RdKafkaRespErrUnsupportedSaslMechanism
- | RdKafkaRespErrIllegalSaslState
- | RdKafkaRespErrUnsupportedVersion
- | RdKafkaRespErrTopicAlreadyExists
- | RdKafkaRespErrInvalidPartitions
- | RdKafkaRespErrInvalidReplicationFactor
- | RdKafkaRespErrInvalidReplicaAssignment
- | RdKafkaRespErrInvalidConfig
- | RdKafkaRespErrNotController
- | RdKafkaRespErrInvalidRequest
- | RdKafkaRespErrUnsupportedForMessageFormat
- | RdKafkaRespErrPolicyViolation
- | RdKafkaRespErrOutOfOrderSequenceNumber
- | RdKafkaRespErrDuplicateSequenceNumber
- | RdKafkaRespErrInvalidProducerEpoch
- | RdKafkaRespErrInvalidTxnState
- | RdKafkaRespErrInvalidProducerIdMapping
- | RdKafkaRespErrInvalidTransactionTimeout
- | RdKafkaRespErrConcurrentTransactions
- | RdKafkaRespErrTransactionCoordinatorFenced
- | RdKafkaRespErrTransactionalIdAuthorizationFailed
- | RdKafkaRespErrSecurityDisabled
- | RdKafkaRespErrOperationNotAttempted
- | RdKafkaRespErrKafkaStorageError
- | RdKafkaRespErrLogDirNotFound
- | RdKafkaRespErrSaslAuthenticationFailed
- | RdKafkaRespErrUnknownProducerId
- | RdKafkaRespErrReassignmentInProgress
- | RdKafkaRespErrDelegationTokenAuthDisabled
- | RdKafkaRespErrDelegationTokenNotFound
- | RdKafkaRespErrDelegationTokenOwnerMismatch
- | RdKafkaRespErrDelegationTokenRequestNotAllowed
- | RdKafkaRespErrDelegationTokenAuthorizationFailed
- | RdKafkaRespErrDelegationTokenExpired
- | RdKafkaRespErrInvalidPrincipalType
- | RdKafkaRespErrNonEmptyGroup
- | RdKafkaRespErrGroupIdNotFound
- | RdKafkaRespErrFetchSessionIdNotFound
- | RdKafkaRespErrInvalidFetchSessionEpoch
- | RdKafkaRespErrListenerNotFound
- | RdKafkaRespErrTopicDeletionDisabled
- | RdKafkaRespErrFencedLeaderEpoch
- | RdKafkaRespErrUnknownLeaderEpoch
- | RdKafkaRespErrUnsupportedCompressionType
- | RdKafkaRespErrStaleBrokerEpoch
- | RdKafkaRespErrOffsetNotAvailable
- | RdKafkaRespErrMemberIdRequired
- | RdKafkaRespErrPreferredLeaderNotAvailable
- | RdKafkaRespErrGroupMaxSizeReached
- | RdKafkaRespErrEndAll
data KafkaCompressionCodec Source #
Eq KafkaCompressionCodec Source # | |
Defined in Kafka.Types (==) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # (/=) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # | |
Show KafkaCompressionCodec Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaCompressionCodec -> ShowS # show :: KafkaCompressionCodec -> String # showList :: [KafkaCompressionCodec] -> ShowS # | |
Generic KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec :: Type -> Type # | |
type Rep KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec = D1 (MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-3.0.0-inplace" False) ((C1 (MetaCons "NoCompression" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Gzip" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Snappy" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Lz4" PrefixI False) (U1 :: Type -> Type))) |
data KafkaDebug Source #
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
data KafkaError Source #
Any Kafka errors
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
data KafkaLogLevel Source #
Log levels for librdkafka.
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Enum KafkaLogLevel Source # | |
Defined in Kafka.Types succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel Source # | |
Defined in Kafka.Types (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |
Timeout in milliseconds
newtype BrokerAddress Source #
Kafka broker address string (e.g. broker1:9092
Eq BrokerAddress Source # | |
Defined in Kafka.Types (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress Source # | |
Defined in Kafka.Types showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
Generic BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress :: Type -> Type # from :: BrokerAddress -> Rep BrokerAddress x # to :: Rep BrokerAddress x -> BrokerAddress # | |
type Rep BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-3.0.0-inplace" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) |
Topic name to be consumed
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
TopicName | a simple topic name or a regex if started with |
User | Normal topics that are created by user. |
System | Topics starting with "" (consumer_offsets, are considered "system" topics |
newtype PartitionId Source #
kafkaDebugToText :: KafkaDebug -> Text Source #
data ConsumerRecord k v Source #
Represents a received message from Kafka (i.e. used in a consumer)
ConsumerRecord | |
data TopicPartition Source #
Kafka topic partition structure
data OffsetStoreMethod Source #
Indicates the method of storing the offsets
OffsetStoreBroker | Offsets are stored in Kafka broker (preferred) |
OffsetStoreFile FilePath OffsetStoreSync | Offsets are stored in a file (and synced to disk according to the sync policy) |
data OffsetStoreSync Source #
Indicates how offsets are to be synced to disk
OffsetSyncDisable | Do not sync offsets (in Kafka: -1) |
OffsetSyncImmediate | Sync immediately after each offset commit (in Kafka: 0) |
OffsetSyncInterval Int | Sync after specified interval in millis |
data OffsetCommit Source #
Offsets commit mode
OffsetCommit | Forces consumer to block until the broker offsets commit is done |
OffsetCommitAsync | Offsets will be committed in a non-blocking way |
Eq OffsetCommit Source # | |
Defined in Kafka.Consumer.Types (==) :: OffsetCommit -> OffsetCommit -> Bool # (/=) :: OffsetCommit -> OffsetCommit -> Bool # | |
Show OffsetCommit Source # | |
Defined in Kafka.Consumer.Types showsPrec :: Int -> OffsetCommit -> ShowS # show :: OffsetCommit -> String # showList :: [OffsetCommit] -> ShowS # | |
Generic OffsetCommit Source # | |
Defined in Kafka.Consumer.Types type Rep OffsetCommit :: Type -> Type # from :: OffsetCommit -> Rep OffsetCommit x # to :: Rep OffsetCommit x -> OffsetCommit # | |
type Rep OffsetCommit Source # | |
Eq Timestamp Source # | |
Read Timestamp Source # | |
Show Timestamp Source # | |
Generic Timestamp Source # | |
type Rep Timestamp Source # | |
Defined in Kafka.Consumer.Types type Rep Timestamp = D1 (MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-3.0.0-inplace" False) (C1 (MetaCons "CreateTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: (C1 (MetaCons "LogAppendTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 Millis)) :+: C1 (MetaCons "NoTimestamp" PrefixI False) (U1 :: Type -> Type))) |
data SubscribedPartitions Source #
data PartitionOffset Source #
PartitionOffsetBeginning | |
PartitionOffsetEnd | |
PartitionOffset Int64 | |
PartitionOffsetStored | |
PartitionOffsetInvalid |
data RebalanceEvent Source #
A set of events which happen during the rebalancing process
RebalanceBeforeAssign [(TopicName, PartitionId)] | Happens before Kafka Client confirms new assignment |
RebalanceAssign [(TopicName, PartitionId)] | Happens after the new assignment is confirmed |
RebalanceBeforeRevoke [(TopicName, PartitionId)] | Happens before Kafka Client confirms partitions rejection |
RebalanceRevoke [(TopicName, PartitionId)] | Happens after the rejection is confirmed |
data OffsetReset Source #
Eq OffsetReset Source # | |
Defined in Kafka.Consumer.Types (==) :: OffsetReset -> OffsetReset -> Bool # (/=) :: OffsetReset -> OffsetReset -> Bool # | |
Show OffsetReset Source # | |
Defined in Kafka.Consumer.Types showsPrec :: Int -> OffsetReset -> ShowS # show :: OffsetReset -> String # showList :: [OffsetReset] -> ShowS # | |
Generic OffsetReset Source # | |
Defined in Kafka.Consumer.Types type Rep OffsetReset :: Type -> Type # from :: OffsetReset -> Rep OffsetReset x # to :: Rep OffsetReset x -> OffsetReset # | |
type Rep OffsetReset Source # | |
newtype ConsumerGroupId Source #
kcKafkaPtr :: KafkaConsumer -> Kafka Source #
kcKafkaConf :: KafkaConsumer -> KafkaConf Source #
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v Source #
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' Source #
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' Source #
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) Source #
Deprecated: Isn't concern of this library. Use 'bitraverse id pure'
traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) Source #
Deprecated: Isn't concern of this library. Use 'bitraverse f pure'
traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) Source #
Deprecated: Isn't concern of this library. Use 'bitraverse id pure $ bitraverse f pure r'
traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) Source #
Deprecated: Isn't concern of this library. Use 'sequenceA $ traverse f r'
bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) Source #
Deprecated: Isn't concern of this library. Use 'bisequenceA $ bimapM f g r'
data Subscription Source #
Semigroup Subscription Source # | |
Defined in Kafka.Consumer.Subscription (<>) :: Subscription -> Subscription -> Subscription # sconcat :: NonEmpty Subscription -> Subscription # stimes :: Integral b => b -> Subscription -> Subscription # | |
Monoid Subscription Source # | |
Defined in Kafka.Consumer.Subscription mempty :: Subscription # mappend :: Subscription -> Subscription -> Subscription # mconcat :: [Subscription] -> Subscription # |
topics :: [TopicName] -> Subscription Source #
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () Source #
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () Source #
Sets a callback that is called when rebalance is needed.
The results of automatic or manual offset commits will be scheduled
for this callback and is served by pollMessage
If no partitions had valid offsets to commit this callback will be called
with KafkaError
== KafkaResponseError
which is not to be considered
an error.
data ConsumerProperties Source #
Properties to create KafkaConsumer
ConsumerProperties | |
Semigroup ConsumerProperties Source # | |
Defined in Kafka.Consumer.ConsumerProperties (<>) :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties # sconcat :: NonEmpty ConsumerProperties -> ConsumerProperties # stimes :: Integral b => b -> ConsumerProperties -> ConsumerProperties # | |
Monoid ConsumerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Consumer.ConsumerProperties |
data CallbackPollMode Source #
Eq CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties (==) :: CallbackPollMode -> CallbackPollMode -> Bool # (/=) :: CallbackPollMode -> CallbackPollMode -> Bool # | |
Show CallbackPollMode Source # | |
Defined in Kafka.Consumer.ConsumerProperties showsPrec :: Int -> CallbackPollMode -> ShowS # show :: CallbackPollMode -> String # showList :: [CallbackPollMode] -> ShowS # |
noAutoCommit :: ConsumerProperties Source #
Disables auto commit for the consumer
noAutoOffsetStore :: ConsumerProperties Source #
Disables auto offset store for the consumer
groupId :: ConsumerGroupId -> ConsumerProperties Source #
Consumer group id
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties Source #
logLevel :: KafkaLogLevel -> ConsumerProperties Source #
Sets the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ConsumerProperties Source #
Sets the compression codec for the consumer.
suppressDisconnectLogs :: ConsumerProperties Source #
Suppresses consumer disconnects logs.
It might be useful to turn this off when interacting with brokers with an aggressive value.
extraProps :: Map Text Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
extraProp :: Text -> Text -> ConsumerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ConsumerProperties Source #
Sets debug features for the consumer.
Usually is used with consumerLogLevel
callbackPollMode :: CallbackPollMode -> ConsumerProperties Source #
Sets the callback poll mode.
The default CallbackPollModeAsync
mode handles polling rebalance
and keep alive events for you
in a background thread.
With CallbacPollModeSync
the user will poll the consumer
frequently to handle new messages as well as rebalance and keep alive events.
lets you can simplify
hw-kafka-client's footprint and have full control over when polling
happens at the cost of having to manage this yourself.
:: ConsumerProperties | |
-> Subscription | |
-> (KafkaConsumer -> IO (Either KafkaError a)) | A callback function to poll and handle messages |
-> IO (Either KafkaError a) |
Deprecated: Use newConsumer/closeConsumer instead
Runs high-level kafka consumer.
A callback provided is expected to call pollMessage
when convenient.
newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #
Returns current consumer's assignment
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #
Returns current consumer's subscription
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Pauses specified partitions on the current consumer.
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Resumes specified partitions on the current consumer.
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve committed offsets for topics+partitions.
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer.
If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid
is returned.
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError) Source #
:: MonadIO m | |
=> KafkaConsumer | |
-> Timeout | the timeout, in milliseconds |
-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) | Left on error or timeout, right for success |
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #
Polls the provided kafka consumer for events.
Events will cause application provided callbacks to be called.
The p timeout_ms argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events.
This function is called on each pollMessage
and, if runtime allows
multi threading, it is called periodically in a separate thread
to ensure the callbacks are handled ASAP.
There is no particular need to call this function manually
unless some special cases in a single-threaded environment
when polling for events on each pollMessage
is not
frequent enough.
pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))] Source #
Polls up to BatchSize messages.
Unlike pollMessage
this function does not return usual "timeout" errors.
An empty batch is returned when there are no messages available.
This API is not available when userPolls
is set.
commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Commit message's offset on broker for the message's partition.
commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Stores offsets locally
storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Stores message's offset locally for the message's partition.
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #
Closes the consumer.
data KafkaConsumer Source #
data RdKafkaRespErrT Source #
Bounded RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka | |
Enum RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka succ :: RdKafkaRespErrT -> RdKafkaRespErrT # pred :: RdKafkaRespErrT -> RdKafkaRespErrT # toEnum :: Int -> RdKafkaRespErrT # fromEnum :: RdKafkaRespErrT -> Int # enumFrom :: RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThen :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThenTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # | |
Eq RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka (==) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # (/=) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # | |
Show RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka showsPrec :: Int -> RdKafkaRespErrT -> ShowS # show :: RdKafkaRespErrT -> String # showList :: [RdKafkaRespErrT] -> ShowS # |