hw-kafka-client: Kafka bindings for Haskell

[ database, library, mit ] [ Propose Tags ]

Apache Kafka bindings backed by the librdkafka C library.

Features include:

  • Consumer groups: auto-rebalancing consumers

  • Keyed and keyless messages producing/consuming

  • Batch producing messages


[Skip to Readme]

Flags

Manual Flags

NameDescriptionDefault
examples

Also compile examples

Disabled

Use -f <flag> to enable a flag, or -f -<flag> to disable that flag. More info

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

  • No Candidates
Versions [RSS] 1.0.0, 1.1.0, 1.1.1, 1.1.2, 1.1.3, 1.1.4, 2.0.0, 2.0.1, 2.0.2, 2.0.3, 2.0.4, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0, 2.4.1, 2.4.3, 2.4.4, 2.5.0, 2.6.0, 2.6.1, 3.0.0, 3.1.0, 3.1.1, 3.1.2, 4.0.0, 4.0.1, 4.0.2, 4.0.3, 4.0.4, 5.0.0, 5.3.0 (info)
Dependencies base (>=4.6 && <5), bifunctors, bytestring, containers, hw-kafka-client, semigroups, text, transformers, unix [details]
License MIT
Author Alexey Raga <alexey.raga@gmail.com>
Maintainer Alexey Raga <alexey.raga@gmail.com>
Category Database
Home page https://github.com/haskell-works/hw-kafka-client
Bug tracker https://github.com/haskell-works/hw-kafka-client/issues
Source repo head: git clone git://github.com/haskell-works/hw-kafka-client.git
Uploaded by alexeyraga at 2020-04-14T05:07:50Z
Distributions LTSHaskell:5.3.0, NixOS:5.3.0, Stackage:5.3.0
Reverse Dependencies 7 direct, 1 indirect [details]
Executables kafka-client-example
Downloads 20214 total (141 in the last 30 days)
Rating 2.25 (votes: 4) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs uploaded by user [build log]
All reported builds failed as of 2020-04-14 [all 3 reports]

Readme for hw-kafka-client-3.1.0

[back to package description]

hw-kafka-client

CircleCI

Kafka bindings for Haskell backed by the librdkafka C module.

Credits

This project is inspired by Haskakafka which unfortunately doesn't seem to be actively maintained.

Ecosystem

HaskellWorks Kafka ecosystem is described here: https://github.com/haskell-works/hw-kafka

Consumer

High level consumers are supported by librdkafka starting from version 0.9. High-level consumers provide an abstraction for consuming messages from multiple partitions and topics. They are also address scalability (up to a number of partitions) by providing automatic rebalancing functionality. When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.

Example:

$ stack build --flag hw-kafka-client:examples

or

$ stack build --exec kafka-client-example --flag hw-kafka-client:examples

A working consumer example can be found here: ConsumerExample.hs
To run an example please compile with the examples flag.

import Control.Exception (bracket)
import Data.Monoid ((<>))
import Kafka
import Kafka.Consumer

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList [BrokerAddress "localhost:9092"]
             <> groupId (ConsumerGroupId "consumer_example_group")
             <> noAutoCommit
             <> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
           <> offsetReset Earliest

-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
    res <- bracket mkConsumer clConsumer runHandler
    print res
    where
      mkConsumer = newConsumer consumerProps consumerSub
      clConsumer (Left err) = return (Left err)
      clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
      runHandler (Left err) = return (Left err)
      runHandler (Right kc) = processMessages kc

-------------------------------------------------------------------
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
    mapM_ (\_ -> do
                   msg1 <- pollMessage kafka (Timeout 1000)
                   putStrLn $ "Message: " <> show msg1
                   err <- commitAllOffsets OffsetCommit kafka
                   putStrLn $ "Offsets: " <> maybe "Committed." show err
          ) [0 .. 10]
    return $ Right ()

Producer

kafka-client producer supports sending messages to multiple topics. Target topic name is a part of each message that is to be sent by produceMessage.

A working producer example can be found here: ProducerExample.hs

Delivery reports

Kafka Producer maintains its own internal queue for outgoing messages. Calling produceMessage does not mean that the message is actually written to Kafka, it only means that the message is put to that outgoing queue and that the producer will (eventually) push it to Kafka.

However, it is not always possible for the producer to send messages to Kafka. Network problems or Kafka cluster being offline can prevent the producer from doing it.

When a message cannot be sent to Kafka for some time (see message.timeout.ms configuration option), the message is dropped from the outgoing queue and the delivery report indicating an error is raised.

It is possible to configure hw-kafka-client to set an infinite message timeout so the message is never dropped from the queue:

producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
             <> sendTimeout (Timeout 0)           -- for librdkafka "0" means "infinite" (see https://github.com/edenhill/librdkafka/issues/2015)

Delivery reports provide the way to detect when producer experiences problems sending messages to Kafka.

Currently hw-kafka-client only supports delivery error callbacks:

producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
             <> setCallback (deliveryCallback print)

In the example above when the producer cannot deliver the message to Kafka, the error will be printed (and the message will be dropped).

Example

{-# LANGUAGE OverloadedStrings #-}
import Control.Exception (bracket)
import Control.Monad (forM_)
import Data.ByteString (ByteString)
import Kafka.Producer

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
             <> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"

-- Run an example
runProducerExample :: IO ()
runProducerExample =
    bracket mkProducer clProducer runHandler >>= print
    where
      mkProducer = newProducer producerProps
      clProducer (Left _)     = return ()
      clProducer (Right prod) = closeProducer prod
      runHandler (Left err)   = return $ Left err
      runHandler (Right prod) = sendMessages prod

sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
  err1 <- produceMessage prod (mkMessage Nothing (Just "test from producer") )
  forM_ err1 print

  err2 <- produceMessage prod (mkMessage (Just "key") (Just "test from producer (with key)"))
  forM_ err2 print

  return $ Right ()

mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage k v = ProducerRecord
                  { prTopic = targetTopic
                  , prPartition = UnassignedPartition
                  , prKey = k
                  , prValue = v
                  }

Synchronous sending of messages

Because of the asynchronous nature of librdkafka, there is no API to provide synchronous production of messages. It is, however, possible to combine the delivery reports feature with that of callbacks. This can be done using the Kafka.Producer.produceMessage' function.

produceMessage' :: MonadIO m
                => KafkaProducer
                -> ProducerRecord
                -> (DeliveryReport -> IO ())
                -> m (Either ImmediateError ())

Using this function, you can provide a callback which will be invoked upon the produced message's delivery report. With a little help of MVars or similar, you can in fact, create a synchronous-like interface.

sendMessageSync :: MonadIO m
                => KafkaProducer
                -> ProducerRecord
                -> m (Either KafkaError Offset)
sendMessageSync producer record = liftIO $ do
  -- Create an empty MVar:
  var <- newEmptyMVar

  -- Produce the message and use the callback to put the delivery report in the
  -- MVar:
  res <- produceMessage' producer record (putMVar var)

  case res of
    Left (ImmediateError err) ->
      pure (Left err)
    Right () -> do
      -- Flush producer queue to make sure you don't get stuck waiting for the
      -- message to send:
      flushProducer producer

      -- Wait for the message's delivery report and map accordingly:
      takeMVar var >>= return . \case
        DeliverySuccess _ offset -> Right offset
        DeliveryFailure _ err    -> Left err
        NoMessageError err       -> Left err

Note: this is a semi-naive solution as this waits forever (or until librdkafka times out). You should make sure that your configuration reflects the behavior you want out of this functionality.

Installation

Installing librdkafka

Although librdkafka is available on many platforms, most of the distribution packages are too old to support kafka-client. As such, we suggest you install from the source:

    git clone https://github.com/edenhill/librdkafka
    cd librdkafka
    ./configure
    make && make install

Sometimes it is helpful to specify openssl includes explicitly:

    LDFLAGS=-L/usr/local/opt/openssl/lib CPPFLAGS=-I/usr/local/opt/openssl/include ./configure

If you are using Stack with Nix, don't forget to declare rdkafka as extra package:

# stack.yaml
nix:
  enable: true
  packages:
    - rdkafka

Installing Kafka

The full Kafka guide is at http://kafka.apache.org/documentation.html#quickstart

Alternatively docker-compose can be used to run Kafka locally inside a Docker container. To run Kafka inside Docker:

$ docker-compose up