The hw-kafka-client package

[Tags:library, mit, program, test]

Apache Kafka bindings backed by the librdkafka C library.

Features include:


[Skip to Readme]

Properties

Versions 1.0.0, 1.1.0, 1.1.1, 1.1.2
Dependencies base (>=4.6 && <5), bifunctors, bytestring, containers, hw-kafka-client, temporary, transformers, unix [details]
License MIT
Author Alexey Raga <alexey.raga@gmail.com>
Maintainer Alexey Raga <alexey.raga@gmail.com>
Category Database
Home page https://github.com/haskell-works/hw-kafka-client
Bug tracker https://github.com/haskell-works/hw-kafka-client/issues
Source repository head: git clone git://github.com/haskell-works/hw-kafka-client.git
Uploaded Tue Feb 28 12:56:02 UTC 2017 by alexeyraga
Distributions NixOS:1.1.2
Downloads 89 total (21 in the last 30 days)
Votes
0 []
Status Docs uploaded by user
Build status unknown [no reports yet]

Modules

[Index]

Downloads

Maintainer's Corner

For package maintainers and hackage trustees

Readme for hw-kafka-client

Readme for hw-kafka-client-1.1.2

hw-kafka-client

CircleCI

Kafka bindings for Haskell backed by the librdkafka C module.

Credits

This project is inspired by Haskakafka which unfortunately doesn't seem to be actively maintained.

Ecosystem

HaskellWorks Kafka ecosystem is described here: https://github.com/haskell-works/hw-kafka

Consumer

High level consumers are supported by librdkafka starting from version 0.9.
High-level consumers provide an abstraction for consuming messages from multiple partitions and topics. They are also address scalability (up to a number of partitions) by providing automatic rebalancing functionality. When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.

Example:

A working consumer example can be found here: ConsumerExample.hs

import Data.Monoid ((<>))
import Kafka
import Kafka.Consumer

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = consumerBrokersList [BrokerAddress "localhost:9092"]
             <> groupId (ConsumerGroupId "consumer_example_group")
             <> noAutoCommit
             <> consumerDebug [DebugAll]

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
           <> offsetReset Earliest

-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
    res <- runConsumer consumerProps consumerSub processMessages
    print res

-------------------------------------------------------------------
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
    mapM_ (\_ -> do
                   msg1 <- pollMessage kafka (Timeout 1000)
                   putStrLn $ "Message: " <> show msg1
                   err <- commitAllOffsets kafka OffsetCommit
                   putStrLn $ "Offsets: " <> maybe "Committed." show err
          ) [0 .. 10]
    return $ Right ()

Producer

kafka-client producer supports sending messages to multiple topics. Target topic name is a part of each message that is to be sent by produceMessage.

A working producer example can be found here: ProducerExample.hs

Example

import Control.Monad (forM_)
import Kafka
import Kafka.Producer

-- Global producer properties
producerProps :: ProducerProperties
producerProps = producerBrokersList [BrokerAddress "localhost:9092"]

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"

-- Run an example
runProducerExample :: IO ()
runProducerExample = do
    res <- runProducer producerProps sendMessages
    print res

sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
  err1 <- produceMessage prod ProducerRecord
                                { prTopic = targetTopic
                                , prPartition = UnassignedPartition
                                , prKey = Nothing
                                , prValue = Just "test from producer"
                                }
  forM_ err1 print

  err2 <- produceMessage prod ProducerRecord
                                { prTopic = targetTopic
                                , prPartition = UnassignedPartition
                                , prKey = Just "key"
                                , prValue = Just "test from producer (with key)"
                                }
  forM_ err2 print

  return $ Right ()

Installation

Installing librdkafka

Although librdkafka is available on many platforms, most of the distribution packages are too old to support kafka-client. As such, we suggest you install from the source:

git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make && sudo make install

Sometimes it is helpful to specify openssl includes explicitly:

LDFLAGS=-L/usr/local/opt/openssl/lib CPPFLAGS=-I/usr/local/opt/openssl/include ./configure

Installing Kafka

The full Kafka guide is at http://kafka.apache.org/documentation.html#quickstart

Alternatively docker-compose can be used to run Kafka locally inside a Docker container. To run Kafka inside Docker:

$ docker-compose up