kafka-client-sync-0.1.0.1: Synchronous Kafka Client

Safe HaskellNone
LanguageHaskell2010

Kafka.Producer.Sync

Contents

Description

This module provides a synchronous interface on top of the hw-kafka-client

It works by using MVars managed in two different queues. Each request is sent as soon as there are no other effectively equal Kafka records in-flight. This is done in order to make sure that there is no ambiguity as to which MVar to resolve.

Currently, this implements fair sending. For all requests, the oldest pending request should be sent first.

Synopsis

Sync producer

data SyncKafkaProducer Source #

A producer for sending messages to Kafka and waiting for the DeliveryReport

newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer) Source #

Create a new SyncKafkaProducer

Note: since this library wraps the regular hw-kafka-client, please be aware that you should not set the delivery report callback. As it is set internally.

Re-exports

Record datatypes

data ProducerRecord #

Instances
Eq ProducerRecord 
Instance details

Defined in Kafka.Producer.Types

Show ProducerRecord 
Instance details

Defined in Kafka.Producer.Types

Generic ProducerRecord 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep ProducerRecord :: Type -> Type #

type Rep ProducerRecord 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducerRecord = D1 (MetaData "ProducerRecord" "Kafka.Producer.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" False) (C1 (MetaCons "ProducerRecord" PrefixI True) ((S1 (MetaSel (Just "prTopic") NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 TopicName) :*: S1 (MetaSel (Just "prPartition") NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 ProducePartition)) :*: (S1 (MetaSel (Just "prKey") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 (Maybe ByteString)) :*: S1 (MetaSel (Just "prValue") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 (Maybe ByteString)))))

newtype TopicName #

Constructors

TopicName 

Fields

Instances
Eq TopicName 
Instance details

Defined in Kafka.Types

Ord TopicName 
Instance details

Defined in Kafka.Types

Read TopicName 
Instance details

Defined in Kafka.Types

Show TopicName 
Instance details

Defined in Kafka.Types

Generic TopicName 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicName :: Type -> Type #

type Rep TopicName 
Instance details

Defined in Kafka.Types

type Rep TopicName = D1 (MetaData "TopicName" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" True) (C1 (MetaCons "TopicName" PrefixI True) (S1 (MetaSel (Just "unTopicName") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

data ProducePartition #

Instances
Eq ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Ord ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Show ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Generic ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep ProducePartition :: Type -> Type #

type Rep ProducePartition 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducePartition = D1 (MetaData "ProducePartition" "Kafka.Producer.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" False) (C1 (MetaCons "SpecifiedPartition" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) SourceUnpack SourceStrict DecidedUnpack) (Rec0 Int)) :+: C1 (MetaCons "UnassignedPartition" PrefixI False) (U1 :: Type -> Type))

Errors

data KafkaError #

Instances
Eq KafkaError 
Instance details

Defined in Kafka.Types

Show KafkaError 
Instance details

Defined in Kafka.Types

Generic KafkaError 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaError :: Type -> Type #

Exception KafkaError 
Instance details

Defined in Kafka.Types

type Rep KafkaError 
Instance details

Defined in Kafka.Types

type Rep KafkaError = D1 (MetaData "KafkaError" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" False) ((C1 (MetaCons "KafkaError" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)) :+: (C1 (MetaCons "KafkaInvalidReturnValue" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "KafkaBadSpecification" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))) :+: ((C1 (MetaCons "KafkaResponseError" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 RdKafkaRespErrT)) :+: C1 (MetaCons "KafkaInvalidConfigurationValue" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) :+: (C1 (MetaCons "KafkaUnknownConfigurationKey" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)) :+: C1 (MetaCons "KafkaBadConfiguration" PrefixI False) (U1 :: Type -> Type))))

Producer configuration

Configuration helpers

Set brokers for producer

Set log-level for producer

Set compression level for producer

Set topic compression for producer

Set send timeout for producer

Set extra properties for producer

Suppress disconnect log lines

Configure extra topic properties

Add KafkaDebug options

Other datatypes

newtype BrokerAddress #

Constructors

BrokerAddress 
Instances
Eq BrokerAddress 
Instance details

Defined in Kafka.Types

Show BrokerAddress 
Instance details

Defined in Kafka.Types

Generic BrokerAddress 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerAddress :: Type -> Type #

type Rep BrokerAddress 
Instance details

Defined in Kafka.Types

type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

data KafkaCompressionCodec #

Constructors

NoCompression 
Gzip 
Snappy 
Lz4 
Instances
Eq KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

Show KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

Generic KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaCompressionCodec :: Type -> Type #

type Rep KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

type Rep KafkaCompressionCodec = D1 (MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" False) ((C1 (MetaCons "NoCompression" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Gzip" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Snappy" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Lz4" PrefixI False) (U1 :: Type -> Type)))

data KafkaDebug #

Instances
Eq KafkaDebug 
Instance details

Defined in Kafka.Types

Show KafkaDebug 
Instance details

Defined in Kafka.Types

Generic KafkaDebug 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaDebug :: Type -> Type #

type Rep KafkaDebug 
Instance details

Defined in Kafka.Types

type Rep KafkaDebug = D1 (MetaData "KafkaDebug" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" False) (((C1 (MetaCons "DebugGeneric" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugBroker" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugTopic" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugMetadata" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugQueue" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugMsg" PrefixI False) (U1 :: Type -> Type)))) :+: ((C1 (MetaCons "DebugProtocol" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugCgrp" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugSecurity" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugFetch" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugFeature" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugAll" PrefixI False) (U1 :: Type -> Type)))))

newtype Timeout #

Constructors

Timeout 

Fields

Instances
Eq Timeout 
Instance details

Defined in Kafka.Types

Methods

(==) :: Timeout -> Timeout -> Bool #

(/=) :: Timeout -> Timeout -> Bool #

Read Timeout 
Instance details

Defined in Kafka.Types

Show Timeout 
Instance details

Defined in Kafka.Types

Generic Timeout 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Timeout :: Type -> Type #

Methods

from :: Timeout -> Rep Timeout x #

to :: Rep Timeout x -> Timeout #

type Rep Timeout 
Instance details

Defined in Kafka.Types

type Rep Timeout = D1 (MetaData "Timeout" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" True) (C1 (MetaCons "Timeout" PrefixI True) (S1 (MetaSel (Just "unTimeout") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int)))