hw-kafka-client-2.4.0: Kafka bindings for Haskell

Safe HaskellNone
LanguageHaskell2010

Kafka.Consumer

Synopsis

Documentation

runConsumer Source #

Arguments

:: ConsumerProperties 
-> Subscription 
-> (KafkaConsumer -> IO (Either KafkaError a))

A callback function to poll and handle messages

-> IO (Either KafkaError a) 

Deprecated: Use newConsumer/closeConsumer instead

Runs high-level kafka consumer. A callback provided is expected to call pollMessage when convenient.

assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #

Returns current consumer's assignment

subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #

Returns current consumer's subscription

pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Pauses specified partitions on the current consumer.

resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Resumes specified partitions on the current consumer.

committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve committed offsets for topics+partitions.

position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer. If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid is returned.

pollMessage Source #

Arguments

:: MonadIO m 
=> KafkaConsumer 
-> Timeout

the timeout, in milliseconds

-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))

Left on error or timeout, right for success

pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #

Polls the provided kafka consumer for events.

Events will cause application provided callbacks to be called.

The p timeout_ms argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events.

This function is called on each pollMessage and, if runtime allows multi threading, it is called periodically in a separate thread to ensure the callbacks are handled ASAP.

There is no particular need to call this function manually unless some special cases in a single-threaded environment when polling for events on each pollMessage is not frequent enough.

commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Commit message's offset on broker for the message's partition.

commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Stores message's offset locally for the message's partition.

closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #

Closes the consumer.

data RdKafkaRespErrT Source #

Constructors

RdKafkaRespErrBegin 
RdKafkaRespErrBadMsg 
RdKafkaRespErrBadCompression 
RdKafkaRespErrDestroy 
RdKafkaRespErrFail 
RdKafkaRespErrTransport 
RdKafkaRespErrCritSysResource 
RdKafkaRespErrResolve 
RdKafkaRespErrMsgTimedOut 
RdKafkaRespErrPartitionEof 
RdKafkaRespErrUnknownPartition 
RdKafkaRespErrFs 
RdKafkaRespErrUnknownTopic 
RdKafkaRespErrAllBrokersDown 
RdKafkaRespErrInvalidArg 
RdKafkaRespErrTimedOut 
RdKafkaRespErrQueueFull 
RdKafkaRespErrIsrInsuff 
RdKafkaRespErrNodeUpdate 
RdKafkaRespErrSsl 
RdKafkaRespErrWaitCoord 
RdKafkaRespErrUnknownGroup 
RdKafkaRespErrInProgress 
RdKafkaRespErrPrevInProgress 
RdKafkaRespErrExistingSubscription 
RdKafkaRespErrAssignPartitions 
RdKafkaRespErrRevokePartitions 
RdKafkaRespErrConflict 
RdKafkaRespErrState 
RdKafkaRespErrUnknownProtocol 
RdKafkaRespErrNotImplemented 
RdKafkaRespErrAuthentication 
RdKafkaRespErrNoOffset 
RdKafkaRespErrOutdated 
RdKafkaRespErrTimedOutQueue 
RdKafkaRespErrUnsupportedFeature 
RdKafkaRespErrWaitCache 
RdKafkaRespErrIntr 
RdKafkaRespErrKeySerialization 
RdKafkaRespErrValueSerialization 
RdKafkaRespErrKeyDeserialization 
RdKafkaRespErrValueDeserialization 
RdKafkaRespErrPartial 
RdKafkaRespErrEnd 
RdKafkaRespErrUnknown 
RdKafkaRespErrNoError 
RdKafkaRespErrOffsetOutOfRange 
RdKafkaRespErrInvalidMsg 
RdKafkaRespErrUnknownTopicOrPart 
RdKafkaRespErrInvalidMsgSize 
RdKafkaRespErrLeaderNotAvailable 
RdKafkaRespErrNotLeaderForPartition 
RdKafkaRespErrRequestTimedOut 
RdKafkaRespErrBrokerNotAvailable 
RdKafkaRespErrReplicaNotAvailable 
RdKafkaRespErrMsgSizeTooLarge 
RdKafkaRespErrStaleCtrlEpoch 
RdKafkaRespErrOffsetMetadataTooLarge 
RdKafkaRespErrNetworkException 
RdKafkaRespErrGroupLoadInProgress 
RdKafkaRespErrGroupCoordinatorNotAvailable 
RdKafkaRespErrNotCoordinatorForGroup 
RdKafkaRespErrTopicException 
RdKafkaRespErrRecordListTooLarge 
RdKafkaRespErrNotEnoughReplicas 
RdKafkaRespErrNotEnoughReplicasAfterAppend 
RdKafkaRespErrInvalidRequiredAcks 
RdKafkaRespErrIllegalGeneration 
RdKafkaRespErrInconsistentGroupProtocol 
RdKafkaRespErrInvalidGroupId 
RdKafkaRespErrUnknownMemberId 
RdKafkaRespErrInvalidSessionTimeout 
RdKafkaRespErrRebalanceInProgress 
RdKafkaRespErrInvalidCommitOffsetSize 
RdKafkaRespErrTopicAuthorizationFailed 
RdKafkaRespErrGroupAuthorizationFailed 
RdKafkaRespErrClusterAuthorizationFailed 
RdKafkaRespErrInvalidTimestamp 
RdKafkaRespErrUnsupportedSaslMechanism 
RdKafkaRespErrIllegalSaslState 
RdKafkaRespErrUnsupportedVersion 
RdKafkaRespErrTopicAlreadyExists 
RdKafkaRespErrInvalidPartitions 
RdKafkaRespErrInvalidReplicationFactor 
RdKafkaRespErrInvalidReplicaAssignment 
RdKafkaRespErrInvalidConfig 
RdKafkaRespErrNotController 
RdKafkaRespErrInvalidRequest 
RdKafkaRespErrUnsupportedForMessageFormat 
RdKafkaRespErrPolicyViolation 
RdKafkaRespErrOutOfOrderSequenceNumber 
RdKafkaRespErrDuplicateSequenceNumber 
RdKafkaRespErrInvalidProducerEpoch 
RdKafkaRespErrInvalidTxnState 
RdKafkaRespErrInvalidProducerIdMapping 
RdKafkaRespErrInvalidTransactionTimeout 
RdKafkaRespErrConcurrentTransactions 
RdKafkaRespErrTransactionCoordinatorFenced 
RdKafkaRespErrTransactionalIdAuthorizationFailed 
RdKafkaRespErrSecurityDisabled 
RdKafkaRespErrOperationNotAttempted 
RdKafkaRespErrEndAll