hw-kafka-client-1.1.4: Kafka bindings for Haskell

Safe HaskellNone
LanguageHaskell2010

Kafka.Consumer

Synopsis

Documentation

runConsumer Source #

Arguments

:: ConsumerProperties 
-> Subscription 
-> (KafkaConsumer -> IO (Either KafkaError a))

A callback function to poll and handle messages

-> IO (Either KafkaError a) 

Runs high-level kafka consumer.

A callback provided is expected to call pollMessage when convenient.

newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #

Creates a kafka consumer. A new consumer MUST be closed with closeConsumer function.

assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m KafkaError Source #

Assigns specified partitions to a current consumer. Assigning an empty list means unassigning from all partitions that are currently assigned.

pollMessage Source #

Arguments

:: MonadIO m 
=> KafkaConsumer 
-> Timeout

the timeout, in milliseconds

-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))

Left on error or timeout, right for success

Polls the next message from a subscription

commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Commit message's offset on broker for the message's partition.

commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #

Closes the consumer.

newtype TopicName Source #

Topic name to be consumed

Wildcard (regex) topics are supported by the librdkafka assignor: any topic name in the topics list that is prefixed with ^ will be regex-matched to the full list of topics in the cluster and matching topics will be added to the subscription list.

Constructors

TopicName String

a simple topic name or a regex if started with ^

data OffsetCommit Source #

Offsets commit mode

Constructors

OffsetCommit

Forces consumer to block until the broker offsets commit is done

OffsetCommitAsync

Offsets will be committed in a non-blocking way

data ConsumerRecord k v Source #

Represents a received message from Kafka (i.e. used in a consumer)

Constructors

ConsumerRecord 

Fields

Instances

Bifunctor ConsumerRecord Source # 

Methods

bimap :: (a -> b) -> (c -> d) -> ConsumerRecord a c -> ConsumerRecord b d #

first :: (a -> b) -> ConsumerRecord a c -> ConsumerRecord b c #

second :: (b -> c) -> ConsumerRecord a b -> ConsumerRecord a c #

Bitraversable ConsumerRecord Source # 

Methods

bitraverse :: Applicative f => (a -> f c) -> (b -> f d) -> ConsumerRecord a b -> f (ConsumerRecord c d) #

Bifoldable ConsumerRecord Source # 

Methods

bifold :: Monoid m => ConsumerRecord m m -> m #

bifoldMap :: Monoid m => (a -> m) -> (b -> m) -> ConsumerRecord a b -> m #

bifoldr :: (a -> c -> c) -> (b -> c -> c) -> c -> ConsumerRecord a b -> c #

bifoldl :: (c -> a -> c) -> (c -> b -> c) -> c -> ConsumerRecord a b -> c #

Functor (ConsumerRecord k) Source # 

Methods

fmap :: (a -> b) -> ConsumerRecord k a -> ConsumerRecord k b #

(<$) :: a -> ConsumerRecord k b -> ConsumerRecord k a #

Foldable (ConsumerRecord k) Source # 

Methods

fold :: Monoid m => ConsumerRecord k m -> m #

foldMap :: Monoid m => (a -> m) -> ConsumerRecord k a -> m #

foldr :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldr' :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldl :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldl' :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldr1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

foldl1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

toList :: ConsumerRecord k a -> [a] #

null :: ConsumerRecord k a -> Bool #

length :: ConsumerRecord k a -> Int #

elem :: Eq a => a -> ConsumerRecord k a -> Bool #

maximum :: Ord a => ConsumerRecord k a -> a #

minimum :: Ord a => ConsumerRecord k a -> a #

sum :: Num a => ConsumerRecord k a -> a #

product :: Num a => ConsumerRecord k a -> a #

Traversable (ConsumerRecord k) Source # 

Methods

traverse :: Applicative f => (a -> f b) -> ConsumerRecord k a -> f (ConsumerRecord k b) #

sequenceA :: Applicative f => ConsumerRecord k (f a) -> f (ConsumerRecord k a) #

mapM :: Monad m => (a -> m b) -> ConsumerRecord k a -> m (ConsumerRecord k b) #

sequence :: Monad m => ConsumerRecord k (m a) -> m (ConsumerRecord k a) #

(Eq v, Eq k) => Eq (ConsumerRecord k v) Source # 
(Read v, Read k) => Read (ConsumerRecord k v) Source # 
(Show v, Show k) => Show (ConsumerRecord k v) Source # 

data RdKafkaRespErrT Source #

Constructors

RdKafkaRespErrBegin 
RdKafkaRespErrBadMsg 
RdKafkaRespErrBadCompression 
RdKafkaRespErrDestroy 
RdKafkaRespErrFail 
RdKafkaRespErrTransport 
RdKafkaRespErrCritSysResource 
RdKafkaRespErrResolve 
RdKafkaRespErrMsgTimedOut 
RdKafkaRespErrPartitionEof 
RdKafkaRespErrUnknownPartition 
RdKafkaRespErrFs 
RdKafkaRespErrUnknownTopic 
RdKafkaRespErrAllBrokersDown 
RdKafkaRespErrInvalidArg 
RdKafkaRespErrTimedOut 
RdKafkaRespErrQueueFull 
RdKafkaRespErrIsrInsuff 
RdKafkaRespErrNodeUpdate 
RdKafkaRespErrSsl 
RdKafkaRespErrWaitCoord 
RdKafkaRespErrUnknownGroup 
RdKafkaRespErrInProgress 
RdKafkaRespErrPrevInProgress 
RdKafkaRespErrExistingSubscription 
RdKafkaRespErrAssignPartitions 
RdKafkaRespErrRevokePartitions 
RdKafkaRespErrConflict 
RdKafkaRespErrState 
RdKafkaRespErrUnknownProtocol 
RdKafkaRespErrNotImplemented 
RdKafkaRespErrAuthentication 
RdKafkaRespErrNoOffset 
RdKafkaRespErrOutdated 
RdKafkaRespErrTimedOutQueue 
RdKafkaRespErrUnsupportedFeature 
RdKafkaRespErrWaitCache 
RdKafkaRespErrEnd 
RdKafkaRespErrUnknown 
RdKafkaRespErrNoError 
RdKafkaRespErrOffsetOutOfRange 
RdKafkaRespErrInvalidMsg 
RdKafkaRespErrUnknownTopicOrPart 
RdKafkaRespErrInvalidMsgSize 
RdKafkaRespErrLeaderNotAvailable 
RdKafkaRespErrNotLeaderForPartition 
RdKafkaRespErrRequestTimedOut 
RdKafkaRespErrBrokerNotAvailable 
RdKafkaRespErrReplicaNotAvailable 
RdKafkaRespErrMsgSizeTooLarge 
RdKafkaRespErrStaleCtrlEpoch 
RdKafkaRespErrOffsetMetadataTooLarge 
RdKafkaRespErrNetworkException 
RdKafkaRespErrGroupLoadInProgress 
RdKafkaRespErrGroupCoordinatorNotAvailable 
RdKafkaRespErrNotCoordinatorForGroup 
RdKafkaRespErrTopicException 
RdKafkaRespErrRecordListTooLarge 
RdKafkaRespErrNotEnoughReplicas 
RdKafkaRespErrNotEnoughReplicasAfterAppend 
RdKafkaRespErrInvalidRequiredAcks 
RdKafkaRespErrIllegalGeneration 
RdKafkaRespErrInconsistentGroupProtocol 
RdKafkaRespErrInvalidGroupId 
RdKafkaRespErrUnknownMemberId 
RdKafkaRespErrInvalidSessionTimeout 
RdKafkaRespErrRebalanceInProgress 
RdKafkaRespErrInvalidCommitOffsetSize 
RdKafkaRespErrTopicAuthorizationFailed 
RdKafkaRespErrGroupAuthorizationFailed 
RdKafkaRespErrClusterAuthorizationFailed 
RdKafkaRespErrInvalidTimestamp 
RdKafkaRespErrUnsupportedSaslMechanism 
RdKafkaRespErrIllegalSaslState 
RdKafkaRespErrUnsupportedVersion 
RdKafkaRespErrTopicAlreadyExists 
RdKafkaRespErrInvalidPartitions 
RdKafkaRespErrInvalidReplicationFactor 
RdKafkaRespErrInvalidReplicaAssignment 
RdKafkaRespErrInvalidConfig 
RdKafkaRespErrNotController 
RdKafkaRespErrInvalidRequest 
RdKafkaRespErrUnsupportedForMessageFormat 
RdKafkaRespErrEndAll