Safe Haskell | None |
---|---|
Language | Haskell2010 |
Kafka.Worker is a module for processing a Kafka log. It can be used to build a CLI that will consume and process a user-defined message type
Synopsis
- process :: Settings -> Text -> TopicSubscription -> IO ()
- data Settings
- decoder :: Decoder Settings
- data TopicSubscription
- subscription :: (FromJSON msg, ToJSON msg) => Text -> (msg -> Task Text ()) -> TopicSubscription
- subscriptionManageOwnOffsets :: (FromJSON msg, ToJSON msg) => Text -> CommitToKafkaAsWell -> ([Int] -> Task Text (List PartitionOffset)) -> (PartitionOffset -> msg -> Task Text SeekCmd) -> TopicSubscription
- data PartitionOffset = PartitionOffset {
- partitionId :: Int
- offset :: Int
- data SeekCmd
- = NoSeek
- | SeekToOffset Int
- data CommitToKafkaAsWell
Documentation
process :: Settings -> Text -> TopicSubscription -> IO () Source #
Starts the kafka worker handling messages.
Settings
decoder :: Decoder Settings Source #
decodes Settings from environmental variables Also consumes Observability env variables (see nri-observability) KAFKA_BROKER_ADDRESSES=localhost:9092 # comma delimeted list KAFKA_LOG_LEVEL=Debug KAFKA_POLLING_TIMEOUT=1000 KAFKA_MAX_MESSAGES_PER_SECOND_PER_PARTITION=0 (disabled) KAFKA_MAX_POLL_INTERVAL_MS=300000 KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY=100 KAFKA_POLL_BATCH_SIZE=100 KAFKA_SKIP_ON_PROCESS_MESSAGE_FAILURE=0 KAFKA_GROUP_ID=0
Subscriptions
data TopicSubscription Source #
The topics this worker should subscribe too. At the moment this library only supports subscribing to a single topic.
subscription :: (FromJSON msg, ToJSON msg) => Text -> (msg -> Task Text ()) -> TopicSubscription Source #
Create a subscription for a topic.
main :: IO () main = do settings <- Environment.decode decoder let subscription = subscription "the-topic" (\msg -> Debug.todo "Process your message here!") process settings subscription
subscriptionManageOwnOffsets :: (FromJSON msg, ToJSON msg) => Text -> CommitToKafkaAsWell -> ([Int] -> Task Text (List PartitionOffset)) -> (PartitionOffset -> msg -> Task Text SeekCmd) -> TopicSubscription Source #
Create a subscription for a topic and manage offsets for that topic yourself.
You'll need to tell Kafka where it can read starting offsets. When passed a message you can also tell Kafka to seek to a different offset.
main :: IO () main = do settings <- Environment.decode decoder let subscription = subscriptionManageOwnOffsets "the-topic" CommitToKafkaAsWell (\partitions -> sql "SELECT partition, offset FROM offsets WHERE partition = %" [partitions] ) (\msg -> Debug.todo "Process your message here!") process settings subscription
data PartitionOffset Source #
Params needed to write / read offsets to another data store
PartitionOffset | |
|
SeekCmd is the expected response of the MessageCallback. Return NoSeek if processing succeeded and the offset was correct. Return SeekToOffset with the expected offset if the offset of the message processed was wrong (this only makes sense when Kafka isn't managing the offset) The MessageCallback will throw if proccessing throws.
NoSeek | |
SeekToOffset Int |
data CommitToKafkaAsWell Source #
Commit the offset to Kafka in addition to an externally managed storage.