Safe Haskell | None |
---|
- fetchBrokerMetadata :: ConfigOverrides -> String -> Int -> IO (Either KafkaError KafkaMetadata)
- withKafkaConsumer :: ConfigOverrides -> ConfigOverrides -> String -> String -> Int -> KafkaOffset -> (Kafka -> KafkaTopic -> IO a) -> IO a
- consumeMessage :: KafkaTopic -> Int -> Int -> IO (Either KafkaError KafkaMessage)
- consumeMessageBatch :: KafkaTopic -> Int -> Int -> Int -> IO (Either KafkaError [KafkaMessage])
- withKafkaProducer :: ConfigOverrides -> ConfigOverrides -> String -> String -> (Kafka -> KafkaTopic -> IO a) -> IO a
- produceMessage :: KafkaTopic -> KafkaProducePartition -> KafkaProduceMessage -> IO (Maybe KafkaError)
- produceKeyedMessage :: KafkaTopic -> KafkaProduceMessage -> IO (Maybe KafkaError)
- produceMessageBatch :: KafkaTopic -> KafkaProducePartition -> [KafkaProduceMessage] -> IO [(KafkaProduceMessage, KafkaError)]
- storeOffset :: KafkaTopic -> Int -> Int -> IO (Maybe KafkaError)
- getAllMetadata :: Kafka -> Int -> IO (Either KafkaError KafkaMetadata)
- getTopicMetadata :: Kafka -> KafkaTopic -> Int -> IO (Either KafkaError KafkaTopicMetadata)
- newKafka :: RdKafkaTypeT -> ConfigOverrides -> IO Kafka
- newKafkaTopic :: Kafka -> String -> ConfigOverrides -> IO KafkaTopic
- dumpConfFromKafka :: Kafka -> IO (Map String String)
- dumpConfFromKafkaTopic :: KafkaTopic -> IO (Map String String)
- setLogLevel :: Kafka -> KafkaLogLevel -> IO ()
- hPrintSupportedKafkaConf :: Handle -> IO ()
- hPrintKafka :: Handle -> Kafka -> IO ()
- rdKafkaVersionStr :: String
- data Kafka
- data KafkaTopic
- data KafkaOffset
- = KafkaOffsetBeginning
- | KafkaOffsetEnd
- | KafkaOffset Int64
- | KafkaOffsetStored
- data KafkaMessage = KafkaMessage {
- messagePartition :: !Int
- messageOffset :: !Int64
- messagePayload :: !ByteString
- messageKey :: Maybe ByteString
- data KafkaProduceMessage
- = KafkaProduceMessage !ByteString
- | KafkaProduceKeyedMessage !ByteString !ByteString
- data KafkaProducePartition
- data KafkaMetadata = KafkaMetadata {
- brokers :: [KafkaBrokerMetadata]
- topics :: [Either KafkaError KafkaTopicMetadata]
- data KafkaBrokerMetadata = KafkaBrokerMetadata {
- brokerId :: Int
- brokerHost :: String
- brokerPort :: Int
- data KafkaTopicMetadata = KafkaTopicMetadata {
- topicName :: String
- topicPartitions :: [Either KafkaError KafkaPartitionMetadata]
- data KafkaPartitionMetadata = KafkaPartitionMetadata {
- partitionId :: Int
- partitionLeader :: Int
- partitionReplicas :: [Int]
- partitionIsrs :: [Int]
- data KafkaLogLevel
- data KafkaError
- = KafkaError String
- | KafkaInvalidReturnValue
- | KafkaBadSpecification String
- | KafkaResponseError RdKafkaRespErrT
- | KafkaInvalidConfigurationValue String
- | KafkaUnknownConfigurationKey String
- | KakfaBadConfiguration
- data RdKafkaRespErrT
- addBrokers :: Kafka -> String -> IO ()
- startConsuming :: KafkaTopic -> Int -> KafkaOffset -> IO ()
- stopConsuming :: KafkaTopic -> Int -> IO ()
- drainOutQueue :: Kafka -> IO ()
Documentation
:: ConfigOverrides | connection overrides, see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md |
-> String | |
-> Int | |
-> IO (Either KafkaError KafkaMetadata) |
Opens a connection with brokers and returns metadata about topics, partitions and brokers.
:: ConfigOverrides | config overrides for kafka. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care. |
-> ConfigOverrides | config overrides for topic. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care. |
-> String | broker string, e.g. localhost:9092 |
-> String | topic name |
-> Int | partition to consume from. Locked until the function returns. |
-> KafkaOffset | where to begin consuming in the partition. |
-> (Kafka -> KafkaTopic -> IO a) | your cod, fed with |
-> IO a | returns what your code does |
Connects to Kafka broker in consumer mode for a specific partition,
taking a function that is fed with
Kafka
and KafkaTopic
instances. After receiving handles, you should be using
consumeMessage
and consumeMessageBatch
to receive messages. This function
automatically starts consuming before calling your code.
:: KafkaTopic | |
-> Int | partition number to consume from (must match |
-> Int | the timeout, in milliseconds (10^3 per second) |
-> IO (Either KafkaError KafkaMessage) | Left on error or timeout, right for success |
Consumes a single message from a Kafka topic, waiting up to a given timeout
:: KafkaTopic | |
-> Int | partition number to consume from (must match |
-> Int | timeout in milliseconds (10^3 per second) |
-> Int | maximum number of messages to return |
-> IO (Either KafkaError [KafkaMessage]) | Left on error, right with up to |
Consumes a batch of messages from a Kafka topic, waiting up to a given timeout. Partial results will be returned if a timeout occurs.
:: ConfigOverrides | config overrides for kafka. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care. |
-> ConfigOverrides | config overrides for topic. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Use an empty list if you don't care. |
-> String | broker string, e.g. localhost:9092 |
-> String | topic name |
-> (Kafka -> KafkaTopic -> IO a) | your code, fed with |
-> IO a | returns what your code does |
Connects to Kafka broker in producer mode for a given topic, taking a function
that is fed with Kafka
and KafkaTopic
instances. After receiving handles you
should be using produceMessage
, produceKeyedMessage
and produceMessageBatch
to publish messages. This function drains the outbound queue automatically before returning.
:: KafkaTopic | topic pointer |
-> KafkaProducePartition | the partition to produce to. Specify |
-> KafkaProduceMessage | the message to enqueue. This function is undefined for keyed messages. |
-> IO (Maybe KafkaError) |
Produce a single unkeyed message to either a random partition or specified partition. Since
librdkafka is backed by a queue, this function can return before messages are sent. See
drainOutQueue
to wait for queue to empty.
:: KafkaTopic | topic pointer |
-> KafkaProduceMessage | keyed message. This function is undefined for unkeyed messages. |
-> IO (Maybe KafkaError) | Nothing on success, error if something went wrong. |
Produce a single keyed message. Since librdkafka is backed by a queue, this function can return
before messages are sent. See drainOutQueue
to wait for a queue to be empty
:: KafkaTopic | topic pointer |
-> KafkaProducePartition | partition to produce to. Specify |
-> [KafkaProduceMessage] | list of messages to enqueue. |
-> IO [(KafkaProduceMessage, KafkaError)] |
Produce a batch of messages. Since librdkafka is backed by a queue, this function can return
before messages are sent. See drainOutQueue
to wait for the queue to be empty.
storeOffset :: KafkaTopic -> Int -> Int -> IO (Maybe KafkaError)Source
Store a partition's offset in librdkafka's offset store. This function only needs to be called if auto.commit.enable is false. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for information on how to configure the offset store.
:: Kafka | |
-> Int | timeout in milliseconds (10^3 per second) |
-> IO (Either KafkaError KafkaMetadata) |
Grabs all metadata from a given Kafka instance.
:: Kafka | |
-> KafkaTopic | |
-> Int | timeout in milliseconds (10^3 per second) |
-> IO (Either KafkaError KafkaTopicMetadata) |
Grabs topic metadata from a given Kafka topic instance
newKafka :: RdKafkaTypeT -> ConfigOverrides -> IO KafkaSource
Create kafka object with the given configuration. Most of the time
you will not need to use this function directly
(see withKafkaProducer
and withKafkaConsumer
).
newKafkaTopic :: Kafka -> String -> ConfigOverrides -> IO KafkaTopicSource
Create a kafka topic object with the given configuration. Most of the
time you will not need to use this function directly
(see withKafkaProducer
and withKafkaConsumer
)
dumpConfFromKafka :: Kafka -> IO (Map String String)Source
Returns a map of the current kafka configuration
dumpConfFromKafkaTopic :: KafkaTopic -> IO (Map String String)Source
Returns a map of the current topic configuration
setLogLevel :: Kafka -> KafkaLogLevel -> IO ()Source
Sets library log level (noisiness) with respect to a kafka instance
hPrintSupportedKafkaConf :: Handle -> IO ()Source
Prints out all supported Kafka conf properties to a handle
hPrintKafka :: Handle -> Kafka -> IO ()Source
Prints out all data associated with a specific kafka object to a handle
rdKafkaVersionStr :: StringSource
data KafkaTopic Source
Main pointer to Kafka topic, which is what we consume from or produce to
data KafkaOffset Source
Starting locations for a consumer
KafkaOffsetBeginning | Start reading from the beginning of the partition |
KafkaOffsetEnd | Start reading from the end |
KafkaOffset Int64 | Start reading from a specific location within the partition |
KafkaOffsetStored | Start reading from the stored offset. See librdkafka's documentation for offset store configuration. |
data KafkaMessage Source
Represents received messages from a Kafka broker (i.e. used in a consumer)
KafkaMessage | |
|
Eq KafkaMessage | |
Read KafkaMessage | |
Show KafkaMessage | |
Typeable KafkaMessage |
data KafkaProduceMessage Source
Represents messages to be enqueued onto a Kafka broker (i.e. used for a producer)
KafkaProduceMessage !ByteString | A message without a key, assigned to |
KafkaProduceKeyedMessage !ByteString !ByteString | A message with a key, assigned to a partition based on the key |
Eq KafkaProduceMessage | |
Show KafkaProduceMessage | |
Typeable KafkaProduceMessage |
data KafkaProducePartition Source
Options for destination partition when enqueuing a message
KafkaSpecifiedPartition !Int | A specific partition in the topic |
KafkaUnassignedPartition | A random partition within the topic |
data KafkaMetadata Source
Metadata for all Kafka brokers
KafkaMetadata | |
|
Eq KafkaMetadata | |
Show KafkaMetadata | |
Typeable KafkaMetadata |
data KafkaBrokerMetadata Source
Metadata for a specific Kafka broker
KafkaBrokerMetadata | |
|
Eq KafkaBrokerMetadata | |
Show KafkaBrokerMetadata | |
Typeable KafkaBrokerMetadata |
data KafkaTopicMetadata Source
Metadata for a specific topic
KafkaTopicMetadata | |
|
Eq KafkaTopicMetadata | |
Show KafkaTopicMetadata | |
Typeable KafkaTopicMetadata |
data KafkaPartitionMetadata Source
Metadata for a specific partition
KafkaPartitionMetadata | |
|
Eq KafkaPartitionMetadata | |
Show KafkaPartitionMetadata | |
Typeable KafkaPartitionMetadata |
data KafkaLogLevel Source
Log levels for the RdKafkaLibrary used in setKafkaLogLevel
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Enum KafkaLogLevel |
data KafkaError Source
Any Kafka errors
Eq KafkaError | |
Show KafkaError | |
Typeable KafkaError | |
Exception KafkaError |
data RdKafkaRespErrT Source
Enum RdKafkaRespErrT | |
Eq RdKafkaRespErrT | |
Show RdKafkaRespErrT |
addBrokers :: Kafka -> String -> IO ()Source
Adds a broker string to a given kafka instance. You
probably shouldn't use this directly (see withKafkaConsumer
and withKafkaProducer
)
startConsuming :: KafkaTopic -> Int -> KafkaOffset -> IO ()Source
Starts consuming for a given topic. You probably do not need
to call this directly (it is called automatically by withKafkaConsumer
) but
consumeMessage
won't work without it. This function is non-blocking.
stopConsuming :: KafkaTopic -> Int -> IO ()Source
Stops consuming for a given topic. You probably do not need to call
this directly (it is called automatically when withKafkaConsumer
terminates).
drainOutQueue :: Kafka -> IO ()Source
Drains the outbound queue for a producer. This function is called automatically at the end of
withKafkaProducer
and usually doesn't need to be called directly.