nri-kafka-0.1.0.4: Functions for working with Kafka
Safe HaskellNone
LanguageHaskell2010

Kafka.Worker

Description

Kafka.Worker is a module for processing a Kafka log. It can be used to build a CLI that will consume and process a user-defined message type

Synopsis

Documentation

process :: Settings -> Text -> TopicSubscription -> IO () Source #

Starts the kafka worker handling messages.

Settings

data Settings Source #

Settings required to process kafka messages

decoder :: Decoder Settings Source #

decodes Settings from environmental variables Also consumes Observability env variables (see nri-observability) KAFKA_BROKER_ADDRESSES=localhost:9092 # comma delimeted list KAFKA_LOG_LEVEL=Debug KAFKA_POLLING_TIMEOUT=1000 KAFKA_MAX_MESSAGES_PER_SECOND_PER_PARTITION=0 (disabled) KAFKA_MAX_POLL_INTERVAL_MS=300000 KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY=100 KAFKA_POLL_BATCH_SIZE=100 KAFKA_SKIP_ON_PROCESS_MESSAGE_FAILURE=0 KAFKA_GROUP_ID=0

Subscriptions

data TopicSubscription Source #

The topics this worker should subscribe too. At the moment this library only supports subscribing to a single topic.

subscription :: (FromJSON msg, ToJSON msg) => Text -> (msg -> Task Text ()) -> TopicSubscription Source #

Create a subscription for a topic.

main :: IO ()
main = do
  settings <- Environment.decode decoder
  let subscription =
        subscription
          "the-topic"
          (\msg -> Debug.todo "Process your message here!")
  process settings subscription

subscriptionManageOwnOffsets :: (FromJSON msg, ToJSON msg) => Text -> CommitToKafkaAsWell -> ([Int] -> Task Text (List PartitionOffset)) -> (PartitionOffset -> msg -> Task Text SeekCmd) -> TopicSubscription Source #

Create a subscription for a topic and manage offsets for that topic yourself.

You'll need to tell Kafka where it can read starting offsets. When passed a message you can also tell Kafka to seek to a different offset.

main :: IO ()
main = do
  settings <- Environment.decode decoder
  let subscription =
        subscriptionManageOwnOffsets
          "the-topic"
          CommitToKafkaAsWell
          (\partitions ->
             sql
               "SELECT partition, offset FROM offsets WHERE partition = %"
               [partitions] )
          (\msg -> Debug.todo "Process your message here!")
  process settings subscription

data PartitionOffset Source #

Params needed to write / read offsets to another data store

Constructors

PartitionOffset 

Fields

data SeekCmd Source #

SeekCmd is the expected response of the MessageCallback. Return NoSeek if processing succeeded and the offset was correct. Return SeekToOffset with the expected offset if the offset of the message processed was wrong (this only makes sense when Kafka isn't managing the offset) The MessageCallback will throw if proccessing throws.

Constructors

NoSeek 
SeekToOffset Int 

data CommitToKafkaAsWell Source #

Commit the offset to Kafka in addition to an externally managed storage.