haskakafka-1.0.0: Kafka bindings for Haskell

Safe HaskellNone

Haskakafka

Synopsis

Documentation

fetchBrokerMetadataSource

Arguments

:: ConfigOverrides

connection overrides, see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

-> String 
-> Int 
-> IO (Either KafkaError KafkaMetadata) 

Opens a connection with brokers and returns metadata about topics, partitions and brokers.

withKafkaConsumerSource

Arguments

:: ConfigOverrides

config overrides for kafka. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care.

-> ConfigOverrides

config overrides for topic. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care.

-> String

broker string, e.g. localhost:9092

-> String

topic name

-> Int

partition to consume from. Locked until the function returns.

-> KafkaOffset

where to begin consuming in the partition.

-> (Kafka -> KafkaTopic -> IO a)

your cod, fed with Kafka and KafkaTopic instances for subsequent interaction.

-> IO a

returns what your code does

Connects to Kafka broker in consumer mode for a specific partition, taking a function that is fed with Kafka and KafkaTopic instances. After receiving handles, you should be using consumeMessage and consumeMessageBatch to receive messages. This function automatically starts consuming before calling your code.

consumeMessageSource

Arguments

:: KafkaTopic 
-> Int

partition number to consume from (must match withKafkaConsumer)

-> Int

the timeout, in milliseconds (10^3 per second)

-> IO (Either KafkaError KafkaMessage)

Left on error or timeout, right for success

Consumes a single message from a Kafka topic, waiting up to a given timeout

consumeMessageBatchSource

Arguments

:: KafkaTopic 
-> Int

partition number to consume from (must match withKafkaConsumer)

-> Int

timeout in milliseconds (10^3 per second)

-> Int

maximum number of messages to return

-> IO (Either KafkaError [KafkaMessage])

Left on error, right with up to maxMessages messages on success

Consumes a batch of messages from a Kafka topic, waiting up to a given timeout. Partial results will be returned if a timeout occurs.

withKafkaProducerSource

Arguments

:: ConfigOverrides

config overrides for kafka. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care.

-> ConfigOverrides

config overrides for topic. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care.

-> String

broker string, e.g. localhost:9092

-> String

topic name

-> (Kafka -> KafkaTopic -> IO a)

your code, fed with Kafka and KafkaTopic instances for subsequent interaction.

-> IO a

returns what your code does

Connects to Kafka broker in producer mode for a given topic, taking a function that is fed with Kafka and KafkaTopic instances. After receiving handles you should be using produceMessage, produceKeyedMessage and produceMessageBatch to publish messages. This function drains the outbound queue automatically before returning.

produceMessageSource

Arguments

:: KafkaTopic

topic pointer

-> KafkaProducePartition

the partition to produce to. Specify KafkaUnassignedPartition if you don't care.

-> KafkaProduceMessage

the message to enqueue. This function is undefined for keyed messages.

-> IO (Maybe KafkaError) 

Produce a single unkeyed message to either a random partition or specified partition. Since librdkafka is backed by a queue, this function can return before messages are sent. See drainOutQueue to wait for queue to empty.

produceKeyedMessageSource

Arguments

:: KafkaTopic

topic pointer

-> KafkaProduceMessage

keyed message. This function is undefined for unkeyed messages.

-> IO (Maybe KafkaError)

Nothing on success, error if something went wrong.

Produce a single keyed message. Since librdkafka is backed by a queue, this function can return before messages are sent. See drainOutQueue to wait for a queue to be empty

produceMessageBatchSource

Arguments

:: KafkaTopic

topic pointer

-> KafkaProducePartition

partition to produce to. Specify KafkaUnassignedPartition if you don't care, or you have keyed messsages.

-> [KafkaProduceMessage]

list of messages to enqueue.

-> IO [(KafkaProduceMessage, KafkaError)] 

Produce a batch of messages. Since librdkafka is backed by a queue, this function can return before messages are sent. See drainOutQueue to wait for the queue to be empty.

storeOffset :: KafkaTopic -> Int -> Int -> IO (Maybe KafkaError)Source

Store a partition's offset in librdkafka's offset store. This function only needs to be called if auto.commit.enable is false. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for information on how to configure the offset store.

getAllMetadataSource

Arguments

:: Kafka 
-> Int

timeout in milliseconds (10^3 per second)

-> IO (Either KafkaError KafkaMetadata) 

Grabs all metadata from a given Kafka instance.

getTopicMetadataSource

Arguments

:: Kafka 
-> KafkaTopic 
-> Int

timeout in milliseconds (10^3 per second)

-> IO (Either KafkaError KafkaTopicMetadata) 

Grabs topic metadata from a given Kafka topic instance

newKafka :: RdKafkaTypeT -> ConfigOverrides -> IO KafkaSource

Create kafka object with the given configuration. Most of the time you will not need to use this function directly (see withKafkaProducer and withKafkaConsumer).

newKafkaTopic :: Kafka -> String -> ConfigOverrides -> IO KafkaTopicSource

Create a kafka topic object with the given configuration. Most of the time you will not need to use this function directly (see withKafkaProducer and withKafkaConsumer)

dumpConfFromKafka :: Kafka -> IO (Map String String)Source

Returns a map of the current kafka configuration

dumpConfFromKafkaTopic :: KafkaTopic -> IO (Map String String)Source

Returns a map of the current topic configuration

setLogLevel :: Kafka -> KafkaLogLevel -> IO ()Source

Sets library log level (noisiness) with respect to a kafka instance

hPrintSupportedKafkaConf :: Handle -> IO ()Source

Prints out all supported Kafka conf properties to a handle

hPrintKafka :: Handle -> Kafka -> IO ()Source

Prints out all data associated with a specific kafka object to a handle

data Kafka Source

Main pointer to Kafka object, which contains our brokers

data KafkaTopic Source

Main pointer to Kafka topic, which is what we consume from or produce to

data KafkaOffset Source

Starting locations for a consumer

Constructors

KafkaOffsetBeginning

Start reading from the beginning of the partition

KafkaOffsetEnd

Start reading from the end

KafkaOffset Int64

Start reading from a specific location within the partition

KafkaOffsetStored

Start reading from the stored offset. See librdkafka's documentation for offset store configuration.

data KafkaMessage Source

Represents received messages from a Kafka broker (i.e. used in a consumer)

Constructors

KafkaMessage 

Fields

messagePartition :: !Int

Kafka partition this message was received from

messageOffset :: !Int64

Offset within the messagePartition Kafka partition

messagePayload :: !ByteString

Contents of the message, as a ByteString

messageKey :: Maybe ByteString

Optional key of the message. Nothing when the message was enqueued without a key

Instances

data KafkaProduceMessage Source

Represents messages to be enqueued onto a Kafka broker (i.e. used for a producer)

Constructors

KafkaProduceMessage !ByteString

A message without a key, assigned to KafkaSpecifiedPartition or KafkaUnassignedPartition

KafkaProduceKeyedMessage !ByteString !ByteString

A message with a key, assigned to a partition based on the key

data KafkaProducePartition Source

Options for destination partition when enqueuing a message

Constructors

KafkaSpecifiedPartition !Int

A specific partition in the topic

KafkaUnassignedPartition

A random partition within the topic

data KafkaMetadata Source

Metadata for all Kafka brokers

Constructors

KafkaMetadata 

Fields

brokers :: [KafkaBrokerMetadata]

Broker metadata

topics :: [Either KafkaError KafkaTopicMetadata]

topic metadata

Instances

data KafkaBrokerMetadata Source

Metadata for a specific Kafka broker

Constructors

KafkaBrokerMetadata 

Fields

brokerId :: Int

broker identifier

brokerHost :: String

hostname for the broker

brokerPort :: Int

port for the broker

data KafkaTopicMetadata Source

Metadata for a specific topic

Constructors

KafkaTopicMetadata 

Fields

topicName :: String

name of the topic

topicPartitions :: [Either KafkaError KafkaPartitionMetadata]

partition metadata

data KafkaPartitionMetadata Source

Metadata for a specific partition

Constructors

KafkaPartitionMetadata 

Fields

partitionId :: Int

identifier for the partition

partitionLeader :: Int

broker leading this partition

partitionReplicas :: [Int]

replicas of the leader

partitionIsrs :: [Int]

In-sync replica set, see http://kafka.apache.org/documentation.html

data KafkaLogLevel Source

Log levels for the RdKafkaLibrary used in setKafkaLogLevel

Instances

addBrokers :: Kafka -> String -> IO ()Source

Adds a broker string to a given kafka instance. You probably shouldn't use this directly (see withKafkaConsumer and withKafkaProducer)

startConsuming :: KafkaTopic -> Int -> KafkaOffset -> IO ()Source

Starts consuming for a given topic. You probably do not need to call this directly (it is called automatically by withKafkaConsumer) but consumeMessage won't work without it. This function is non-blocking.

stopConsuming :: KafkaTopic -> Int -> IO ()Source

Stops consuming for a given topic. You probably do not need to call this directly (it is called automatically when withKafkaConsumer terminates).

drainOutQueue :: Kafka -> IO ()Source

Drains the outbound queue for a producer. This function is called automatically at the end of withKafkaProducer and usually doesn't need to be called directly.