Safe Haskell | None |
---|---|
Language | Haskell2010 |
- runConsumer :: ConsumerProperties -> Subscription -> (KafkaConsumer -> IO (Either KafkaError a)) -> IO (Either KafkaError a)
- newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer)
- assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId]))
- subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
- pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
- committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
- seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
- pollMessage :: MonadIO m => KafkaConsumer -> Timeout -> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
- commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
- commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
- commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
- closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
- data KafkaConsumer
- data RdKafkaRespErrT
- = RdKafkaRespErrBegin
- | RdKafkaRespErrBadMsg
- | RdKafkaRespErrBadCompression
- | RdKafkaRespErrDestroy
- | RdKafkaRespErrFail
- | RdKafkaRespErrTransport
- | RdKafkaRespErrCritSysResource
- | RdKafkaRespErrResolve
- | RdKafkaRespErrMsgTimedOut
- | RdKafkaRespErrPartitionEof
- | RdKafkaRespErrUnknownPartition
- | RdKafkaRespErrFs
- | RdKafkaRespErrUnknownTopic
- | RdKafkaRespErrAllBrokersDown
- | RdKafkaRespErrInvalidArg
- | RdKafkaRespErrTimedOut
- | RdKafkaRespErrQueueFull
- | RdKafkaRespErrIsrInsuff
- | RdKafkaRespErrNodeUpdate
- | RdKafkaRespErrSsl
- | RdKafkaRespErrWaitCoord
- | RdKafkaRespErrUnknownGroup
- | RdKafkaRespErrInProgress
- | RdKafkaRespErrPrevInProgress
- | RdKafkaRespErrExistingSubscription
- | RdKafkaRespErrAssignPartitions
- | RdKafkaRespErrRevokePartitions
- | RdKafkaRespErrConflict
- | RdKafkaRespErrState
- | RdKafkaRespErrUnknownProtocol
- | RdKafkaRespErrNotImplemented
- | RdKafkaRespErrAuthentication
- | RdKafkaRespErrNoOffset
- | RdKafkaRespErrOutdated
- | RdKafkaRespErrTimedOutQueue
- | RdKafkaRespErrUnsupportedFeature
- | RdKafkaRespErrWaitCache
- | RdKafkaRespErrIntr
- | RdKafkaRespErrKeySerialization
- | RdKafkaRespErrValueSerialization
- | RdKafkaRespErrKeyDeserialization
- | RdKafkaRespErrValueDeserialization
- | RdKafkaRespErrPartial
- | RdKafkaRespErrEnd
- | RdKafkaRespErrUnknown
- | RdKafkaRespErrNoError
- | RdKafkaRespErrOffsetOutOfRange
- | RdKafkaRespErrInvalidMsg
- | RdKafkaRespErrUnknownTopicOrPart
- | RdKafkaRespErrInvalidMsgSize
- | RdKafkaRespErrLeaderNotAvailable
- | RdKafkaRespErrNotLeaderForPartition
- | RdKafkaRespErrRequestTimedOut
- | RdKafkaRespErrBrokerNotAvailable
- | RdKafkaRespErrReplicaNotAvailable
- | RdKafkaRespErrMsgSizeTooLarge
- | RdKafkaRespErrStaleCtrlEpoch
- | RdKafkaRespErrOffsetMetadataTooLarge
- | RdKafkaRespErrNetworkException
- | RdKafkaRespErrGroupLoadInProgress
- | RdKafkaRespErrGroupCoordinatorNotAvailable
- | RdKafkaRespErrNotCoordinatorForGroup
- | RdKafkaRespErrTopicException
- | RdKafkaRespErrRecordListTooLarge
- | RdKafkaRespErrNotEnoughReplicas
- | RdKafkaRespErrNotEnoughReplicasAfterAppend
- | RdKafkaRespErrInvalidRequiredAcks
- | RdKafkaRespErrIllegalGeneration
- | RdKafkaRespErrInconsistentGroupProtocol
- | RdKafkaRespErrInvalidGroupId
- | RdKafkaRespErrUnknownMemberId
- | RdKafkaRespErrInvalidSessionTimeout
- | RdKafkaRespErrRebalanceInProgress
- | RdKafkaRespErrInvalidCommitOffsetSize
- | RdKafkaRespErrTopicAuthorizationFailed
- | RdKafkaRespErrGroupAuthorizationFailed
- | RdKafkaRespErrClusterAuthorizationFailed
- | RdKafkaRespErrInvalidTimestamp
- | RdKafkaRespErrUnsupportedSaslMechanism
- | RdKafkaRespErrIllegalSaslState
- | RdKafkaRespErrUnsupportedVersion
- | RdKafkaRespErrTopicAlreadyExists
- | RdKafkaRespErrInvalidPartitions
- | RdKafkaRespErrInvalidReplicationFactor
- | RdKafkaRespErrInvalidReplicaAssignment
- | RdKafkaRespErrInvalidConfig
- | RdKafkaRespErrNotController
- | RdKafkaRespErrInvalidRequest
- | RdKafkaRespErrUnsupportedForMessageFormat
- | RdKafkaRespErrPolicyViolation
- | RdKafkaRespErrOutOfOrderSequenceNumber
- | RdKafkaRespErrDuplicateSequenceNumber
- | RdKafkaRespErrInvalidProducerEpoch
- | RdKafkaRespErrInvalidTxnState
- | RdKafkaRespErrInvalidProducerIdMapping
- | RdKafkaRespErrInvalidTransactionTimeout
- | RdKafkaRespErrConcurrentTransactions
- | RdKafkaRespErrTransactionCoordinatorFenced
- | RdKafkaRespErrTransactionalIdAuthorizationFailed
- | RdKafkaRespErrSecurityDisabled
- | RdKafkaRespErrOperationNotAttempted
- | RdKafkaRespErrEndAll
Documentation
:: ConsumerProperties | |
-> Subscription | |
-> (KafkaConsumer -> IO (Either KafkaError a)) | A callback function to poll and handle messages |
-> IO (Either KafkaError a) |
Deprecated: Use newConsumer/closeConsumer instead
Runs high-level kafka consumer.
A callback provided is expected to call pollMessage
when convenient.
newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #
Returns current consumer's assignment
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #
Returns current consumer's subscription
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Pauses specified partitions on the current consumer.
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #
Resumes specified partitions on the current consumer.
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve committed offsets for topics+partitions.
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #
Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer.
If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid
is returned.
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError) Source #
:: MonadIO m | |
=> KafkaConsumer | |
-> Timeout | the timeout, in milliseconds |
-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) | Left on error or timeout, right for success |
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #
Polls the provided kafka consumer for events.
Events will cause application provided callbacks to be called.
The p timeout_ms argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events.
This function is called on each pollMessage
and, if runtime allows
multi threading, it is called periodically in a separate thread
to ensure the callbacks are handled ASAP.
There is no particular need to call this function manually
unless some special cases in a single-threaded environment
when polling for events on each pollMessage
is not
frequent enough.
commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #
Commit message's offset on broker for the message's partition.
commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #
Commit offsets for all currently assigned partitions.
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #
Closes the consumer.
data KafkaConsumer Source #
data RdKafkaRespErrT Source #