hw-kafka-client-4.0.4: Kafka bindings for Haskell
Safe HaskellNone
LanguageHaskell2010

Kafka.Producer

Description

Module to produce messages to Kafka topics.

Here's an example of code to produce messages to a topic:

import Control.Exception (bracket)
import Control.Monad (forM_)
import Data.ByteString (ByteString)
import Kafka.Producer

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
             <> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"

-- Run an example
runProducerExample :: IO ()
runProducerExample =
    bracket mkProducer clProducer runHandler >>= print
    where
      mkProducer = newProducer producerProps
      clProducer (Left _)     = pure ()
      clProducer (Right prod) = closeProducer prod
      runHandler (Left err)   = pure $ Left err
      runHandler (Right prod) = sendMessages prod

-- Example sending 2 messages and printing the response from Kafka
sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
  err1 <- produceMessage prod (mkMessage Nothing (Just "test from producer") )
  forM_ err1 print

  err2 <- produceMessage prod (mkMessage (Just "key") (Just "test from producer (with key)"))
  forM_ err2 print

  pure $ Right ()

mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage k v = ProducerRecord
                  { prTopic = targetTopic
                  , prPartition = UnassignedPartition
                  , prKey = k
                  , prValue = v
                  }
Synopsis

Documentation

data KafkaProducer Source #

The main type for Kafka message production, used e.g. to send messages.

Its constructor is intentionally not exposed, instead, one should used newProducer to acquire such a value.

data Headers Source #

Headers that might be passed along with a record

Instances

Instances details
Eq Headers Source # 
Instance details

Defined in Kafka.Types

Methods

(==) :: Headers -> Headers -> Bool #

(/=) :: Headers -> Headers -> Bool #

Read Headers Source # 
Instance details

Defined in Kafka.Types

Show Headers Source # 
Instance details

Defined in Kafka.Types

Generic Headers Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Headers :: Type -> Type #

Methods

from :: Headers -> Rep Headers x #

to :: Rep Headers x -> Headers #

Semigroup Headers Source # 
Instance details

Defined in Kafka.Types

Monoid Headers Source # 
Instance details

Defined in Kafka.Types

type Rep Headers Source # 
Instance details

Defined in Kafka.Types

type Rep Headers = D1 ('MetaData "Headers" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "Headers" 'PrefixI 'True) (S1 ('MetaSel ('Just "unHeaders") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(ByteString, ByteString)])))

data KafkaCompressionCodec Source #

Compression codec used by a topic

See Kafka documentation on compression codecs

Constructors

NoCompression 
Gzip 
Snappy 
Lz4 

Instances

Instances details
Eq KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Show KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Generic KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaCompressionCodec :: Type -> Type #

type Rep KafkaCompressionCodec Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaCompressionCodec = D1 ('MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'False) ((C1 ('MetaCons "NoCompression" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Gzip" 'PrefixI 'False) (U1 :: Type -> Type)) :+: (C1 ('MetaCons "Snappy" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Lz4" 'PrefixI 'False) (U1 :: Type -> Type)))

data KafkaDebug Source #

Available librdkafka debug contexts

Instances

Instances details
Eq KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Show KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Generic KafkaDebug Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaDebug :: Type -> Type #

type Rep KafkaDebug Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaDebug = D1 ('MetaData "KafkaDebug" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'False) (((C1 ('MetaCons "DebugGeneric" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugBroker" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugTopic" 'PrefixI 'False) (U1 :: Type -> Type))) :+: (C1 ('MetaCons "DebugMetadata" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugQueue" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugMsg" 'PrefixI 'False) (U1 :: Type -> Type)))) :+: ((C1 ('MetaCons "DebugProtocol" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugCgrp" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugSecurity" 'PrefixI 'False) (U1 :: Type -> Type))) :+: (C1 ('MetaCons "DebugFetch" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "DebugFeature" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "DebugAll" 'PrefixI 'False) (U1 :: Type -> Type)))))

data KafkaError Source #

All possible Kafka errors

Instances

Instances details
Eq KafkaError Source # 
Instance details

Defined in Kafka.Types

Show KafkaError Source # 
Instance details

Defined in Kafka.Types

Generic KafkaError Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaError :: Type -> Type #

Exception KafkaError Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaError Source # 
Instance details

Defined in Kafka.Types

type Rep KafkaError = D1 ('MetaData "KafkaError" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'False) ((C1 ('MetaCons "KafkaError" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: (C1 ('MetaCons "KafkaInvalidReturnValue" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "KafkaBadSpecification" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))) :+: ((C1 ('MetaCons "KafkaResponseError" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 RdKafkaRespErrT)) :+: C1 ('MetaCons "KafkaInvalidConfigurationValue" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text))) :+: (C1 ('MetaCons "KafkaUnknownConfigurationKey" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "KafkaBadConfiguration" 'PrefixI 'False) (U1 :: Type -> Type))))

newtype Timeout Source #

Timeout in milliseconds

Constructors

Timeout 

Fields

Instances

Instances details
Eq Timeout Source # 
Instance details

Defined in Kafka.Types

Methods

(==) :: Timeout -> Timeout -> Bool #

(/=) :: Timeout -> Timeout -> Bool #

Read Timeout Source # 
Instance details

Defined in Kafka.Types

Show Timeout Source # 
Instance details

Defined in Kafka.Types

Generic Timeout Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Timeout :: Type -> Type #

Methods

from :: Timeout -> Rep Timeout x #

to :: Rep Timeout x -> Timeout #

type Rep Timeout Source # 
Instance details

Defined in Kafka.Types

type Rep Timeout = D1 ('MetaData "Timeout" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "Timeout" 'PrefixI 'True) (S1 ('MetaSel ('Just "unTimeout") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

newtype BrokerAddress Source #

Kafka broker address string (e.g. broker1:9092)

Constructors

BrokerAddress 

Instances

Instances details
Eq BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Show BrokerAddress Source # 
Instance details

Defined in Kafka.Types

IsString BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Generic BrokerAddress Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerAddress :: Type -> Type #

type Rep BrokerAddress Source # 
Instance details

Defined in Kafka.Types

type Rep BrokerAddress = D1 ('MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "BrokerAddress" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBrokerAddress") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

newtype TopicName Source #

Topic name to consume/produce messages

Wildcard (regex) topics are supported by the librdkafka assignor: any topic name in the topics list that is prefixed with ^ will be regex-matched to the full list of topics in the cluster and matching topics will be added to the subscription list.

Constructors

TopicName 

Fields

Instances

Instances details
Eq TopicName Source # 
Instance details

Defined in Kafka.Types

Ord TopicName Source # 
Instance details

Defined in Kafka.Types

Read TopicName Source # 
Instance details

Defined in Kafka.Types

Show TopicName Source # 
Instance details

Defined in Kafka.Types

IsString TopicName Source # 
Instance details

Defined in Kafka.Types

Generic TopicName Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicName :: Type -> Type #

type Rep TopicName Source # 
Instance details

Defined in Kafka.Types

type Rep TopicName = D1 ('MetaData "TopicName" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "TopicName" 'PrefixI 'True) (S1 ('MetaSel ('Just "unTopicName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

data TopicType Source #

Whether the topic is created by a user or by the system

Constructors

User

Normal topics that are created by user.

System

Topics starting with a double underscore "__" (__consumer_offsets, __confluent.support.metrics, etc.) are considered "system" topics

Instances

Instances details
Eq TopicType Source # 
Instance details

Defined in Kafka.Types

Ord TopicType Source # 
Instance details

Defined in Kafka.Types

Read TopicType Source # 
Instance details

Defined in Kafka.Types

Show TopicType Source # 
Instance details

Defined in Kafka.Types

Generic TopicType Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicType :: Type -> Type #

type Rep TopicType Source # 
Instance details

Defined in Kafka.Types

type Rep TopicType = D1 ('MetaData "TopicType" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'False) (C1 ('MetaCons "User" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "System" 'PrefixI 'False) (U1 :: Type -> Type))

newtype BatchSize Source #

Batch size used for polling

Constructors

BatchSize 

Fields

Instances

Instances details
Eq BatchSize Source # 
Instance details

Defined in Kafka.Types

Num BatchSize Source # 
Instance details

Defined in Kafka.Types

Ord BatchSize Source # 
Instance details

Defined in Kafka.Types

Read BatchSize Source # 
Instance details

Defined in Kafka.Types

Show BatchSize Source # 
Instance details

Defined in Kafka.Types

Generic BatchSize Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BatchSize :: Type -> Type #

type Rep BatchSize Source # 
Instance details

Defined in Kafka.Types

type Rep BatchSize = D1 ('MetaData "BatchSize" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "BatchSize" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBatchSize") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

newtype ClientId Source #

Client ID used by Kafka to better track requests

See Kafka documentation on client ID

Constructors

ClientId 

Fields

Instances

Instances details
Eq ClientId Source # 
Instance details

Defined in Kafka.Types

Ord ClientId Source # 
Instance details

Defined in Kafka.Types

Show ClientId Source # 
Instance details

Defined in Kafka.Types

IsString ClientId Source # 
Instance details

Defined in Kafka.Types

Generic ClientId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep ClientId :: Type -> Type #

Methods

from :: ClientId -> Rep ClientId x #

to :: Rep ClientId x -> ClientId #

type Rep ClientId Source # 
Instance details

Defined in Kafka.Types

type Rep ClientId = D1 ('MetaData "ClientId" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "ClientId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unClientId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

newtype Millis Source #

A number of milliseconds, used to represent durations and timestamps

Constructors

Millis 

Fields

Instances

Instances details
Eq Millis Source # 
Instance details

Defined in Kafka.Types

Methods

(==) :: Millis -> Millis -> Bool #

(/=) :: Millis -> Millis -> Bool #

Num Millis Source # 
Instance details

Defined in Kafka.Types

Ord Millis Source # 
Instance details

Defined in Kafka.Types

Read Millis Source # 
Instance details

Defined in Kafka.Types

Show Millis Source # 
Instance details

Defined in Kafka.Types

Generic Millis Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Millis :: Type -> Type #

Methods

from :: Millis -> Rep Millis x #

to :: Rep Millis x -> Millis #

type Rep Millis Source # 
Instance details

Defined in Kafka.Types

type Rep Millis = D1 ('MetaData "Millis" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "Millis" 'PrefixI 'True) (S1 ('MetaSel ('Just "unMillis") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)))

newtype PartitionId Source #

Topic partition ID

Constructors

PartitionId 

Fields

Instances

Instances details
Enum PartitionId Source # 
Instance details

Defined in Kafka.Types

Eq PartitionId Source # 
Instance details

Defined in Kafka.Types

Ord PartitionId Source # 
Instance details

Defined in Kafka.Types

Read PartitionId Source # 
Instance details

Defined in Kafka.Types

Show PartitionId Source # 
Instance details

Defined in Kafka.Types

Generic PartitionId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep PartitionId :: Type -> Type #

type Rep PartitionId Source # 
Instance details

Defined in Kafka.Types

type Rep PartitionId = D1 ('MetaData "PartitionId" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "PartitionId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unPartitionId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

newtype BrokerId Source #

Kafka broker ID

Constructors

BrokerId 

Fields

Instances

Instances details
Eq BrokerId Source # 
Instance details

Defined in Kafka.Types

Ord BrokerId Source # 
Instance details

Defined in Kafka.Types

Read BrokerId Source # 
Instance details

Defined in Kafka.Types

Show BrokerId Source # 
Instance details

Defined in Kafka.Types

Generic BrokerId Source # 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerId :: Type -> Type #

Methods

from :: BrokerId -> Rep BrokerId x #

to :: Rep BrokerId x -> BrokerId #

type Rep BrokerId Source # 
Instance details

Defined in Kafka.Types

type Rep BrokerId = D1 ('MetaData "BrokerId" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "BrokerId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBrokerId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

topicType :: TopicName -> TopicType Source #

Deduce the type of a topic from its name, by checking if it starts with a double underscore "__"

kafkaDebugToText :: KafkaDebug -> Text Source #

Convert a KafkaDebug into its librdkafka string equivalent.

This is used internally by the library but may be useful to some developers.

kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text Source #

Convert a KafkaCompressionCodec into its librdkafka string equivalent.

This is used internally by the library but may be useful to some developers.

data Callback Source #

Callbacks allow retrieving various information like error occurences, statistics and log messages. See setCallback (Consumer) and setCallback (Producer) for more details.

data DeliveryReport Source #

The result of sending a message to the broker, useful for callbacks

Constructors

DeliverySuccess ProducerRecord Offset

The message was successfully sent at this offset

DeliveryFailure ProducerRecord KafkaError

The message could not be sent

NoMessageError KafkaError

An error occurred, but librdkafka did not attach any sent message

Instances

Instances details
Eq DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

Show DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

Generic DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep DeliveryReport :: Type -> Type #

type Rep DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

newtype ImmediateError Source #

Data type representing an error that is caused by pre-flight conditions not being met

Instances

Instances details
Eq ImmediateError Source # 
Instance details

Defined in Kafka.Producer.Types

Show ImmediateError Source # 
Instance details

Defined in Kafka.Producer.Types

data ProducePartition Source #

 

Constructors

SpecifiedPartition !Int

The partition number of the topic

UnassignedPartition

Let the Kafka broker decide the partition

Instances

Instances details
Eq ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

Ord ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

Show ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

Generic ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep ProducePartition :: Type -> Type #

type Rep ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducePartition = D1 ('MetaData "ProducePartition" "Kafka.Producer.Types" "hw-kafka-client-4.0.4-inplace" 'False) (C1 ('MetaCons "SpecifiedPartition" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 Int)) :+: C1 ('MetaCons "UnassignedPartition" 'PrefixI 'False) (U1 :: Type -> Type))

data ProducerRecord Source #

Represents messages to be enqueued onto a Kafka broker (i.e. used for a producer)

errorCallback :: (KafkaError -> String -> IO ()) -> Callback Source #

Add a callback for errors.

Examples

Expand

Basic usage:

'setCallback' ('errorCallback' myErrorCallback)

myErrorCallback :: 'KafkaError' -> String -> IO ()
myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message

logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback Source #

Add a callback for logs.

Examples

Expand

Basic usage:

'setCallback' ('logCallback' myLogCallback)

myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message

statsCallback :: (ByteString -> IO ()) -> Callback Source #

Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md.

Examples

Expand

Basic usage:

'setCallback' ('statsCallback' myStatsCallback)

myStatsCallback :: String -> IO ()
myStatsCallback stats = print $ show stats

deliveryCallback :: (DeliveryReport -> IO ()) -> Callback Source #

Sets the callback for delivery reports.

/Note: A callback should not be a long-running process as it blocks librdkafka from continuing on the thread that handles the delivery callbacks. For callbacks to individual messsages see 'Kafka.Producer.produceMessage''./

brokersList :: [BrokerAddress] -> ProducerProperties Source #

Set the list of brokers to contact to connect to the Kafka cluster.

setCallback :: Callback -> ProducerProperties Source #

Set the producer callback.

For examples of use, see:

logLevel :: KafkaLogLevel -> ProducerProperties Source #

Sets the logging level. Usually is used with debugOptions to configure which logs are needed.

extraProps :: Map Text Text -> ProducerProperties Source #

Any configuration options that are supported by librdkafka. The full list can be found here

suppressDisconnectLogs :: ProducerProperties Source #

Suppresses producer disconnects logs.

It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.

extraTopicProps :: Map Text Text -> ProducerProperties Source #

Any *topic* configuration options that are supported by librdkafka. The full list can be found here

debugOptions :: [KafkaDebug] -> ProducerProperties Source #

Sets debug features for the producer Usually is used with logLevel.

runProducer :: ProducerProperties -> (KafkaProducer -> IO (Either KafkaError a)) -> IO (Either KafkaError a) Source #

Deprecated: Use newProducer/closeProducer instead

Runs Kafka Producer. The callback provided is expected to call produceMessage to send messages to Kafka.

newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer) Source #

Creates a new kafka producer A newly created producer must be closed with closeProducer function.

produceMessage :: MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError) Source #

Sends a single message. Since librdkafka is backed by a queue, this function can return before messages are sent. See flushProducer to wait for queue to empty.

produceMessage' :: MonadIO m => KafkaProducer -> ProducerRecord -> (DeliveryReport -> IO ()) -> m (Either ImmediateError ()) Source #

Sends a single message with a registered callback.

The callback can be a long running process, as it is forked by the thread that handles the delivery reports.

flushProducer :: MonadIO m => KafkaProducer -> m () Source #

Drains the outbound queue for a producer. This function is also called automatically when the producer is closed with closeProducer to ensure that all queued messages make it to Kafka.

closeProducer :: MonadIO m => KafkaProducer -> m () Source #

Closes the producer. Will wait until the outbound queue is drained before returning the control.

data RdKafkaRespErrT Source #

Constructors

RdKafkaRespErrBegin 
RdKafkaRespErrBadMsg 
RdKafkaRespErrBadCompression 
RdKafkaRespErrDestroy 
RdKafkaRespErrFail 
RdKafkaRespErrTransport 
RdKafkaRespErrCritSysResource 
RdKafkaRespErrResolve 
RdKafkaRespErrMsgTimedOut 
RdKafkaRespErrPartitionEof 
RdKafkaRespErrUnknownPartition 
RdKafkaRespErrFs 
RdKafkaRespErrUnknownTopic 
RdKafkaRespErrAllBrokersDown 
RdKafkaRespErrInvalidArg 
RdKafkaRespErrTimedOut 
RdKafkaRespErrQueueFull 
RdKafkaRespErrIsrInsuff 
RdKafkaRespErrNodeUpdate 
RdKafkaRespErrSsl 
RdKafkaRespErrWaitCoord 
RdKafkaRespErrUnknownGroup 
RdKafkaRespErrInProgress 
RdKafkaRespErrPrevInProgress 
RdKafkaRespErrExistingSubscription 
RdKafkaRespErrAssignPartitions 
RdKafkaRespErrRevokePartitions 
RdKafkaRespErrConflict 
RdKafkaRespErrState 
RdKafkaRespErrUnknownProtocol 
RdKafkaRespErrNotImplemented 
RdKafkaRespErrAuthentication 
RdKafkaRespErrNoOffset 
RdKafkaRespErrOutdated 
RdKafkaRespErrTimedOutQueue 
RdKafkaRespErrUnsupportedFeature 
RdKafkaRespErrWaitCache 
RdKafkaRespErrIntr 
RdKafkaRespErrKeySerialization 
RdKafkaRespErrValueSerialization 
RdKafkaRespErrKeyDeserialization 
RdKafkaRespErrValueDeserialization 
RdKafkaRespErrPartial 
RdKafkaRespErrReadOnly 
RdKafkaRespErrNoent 
RdKafkaRespErrUnderflow 
RdKafkaRespErrInvalidType 
RdKafkaRespErrRetry 
RdKafkaRespErrPurgeQueue 
RdKafkaRespErrPurgeInflight 
RdKafkaRespErrFatal 
RdKafkaRespErrInconsistent 
RdKafkaRespErrGaplessGuarantee 
RdKafkaRespErrMaxPollExceeded 
RdKafkaRespErrUnknownBroker 
RdKafkaRespErrNotConfigured 
RdKafkaRespErrFenced 
RdKafkaRespErrApplication 
RdKafkaRespErrAssignmentLost 
RdKafkaRespErrNoop 
RdKafkaRespErrAutoOffsetReset 
RdKafkaRespErrEnd 
RdKafkaRespErrUnknown 
RdKafkaRespErrNoError 
RdKafkaRespErrOffsetOutOfRange 
RdKafkaRespErrInvalidMsg 
RdKafkaRespErrUnknownTopicOrPart 
RdKafkaRespErrInvalidMsgSize 
RdKafkaRespErrLeaderNotAvailable 
RdKafkaRespErrNotLeaderForPartition 
RdKafkaRespErrRequestTimedOut 
RdKafkaRespErrBrokerNotAvailable 
RdKafkaRespErrReplicaNotAvailable 
RdKafkaRespErrMsgSizeTooLarge 
RdKafkaRespErrStaleCtrlEpoch 
RdKafkaRespErrOffsetMetadataTooLarge 
RdKafkaRespErrNetworkException 
RdKafkaRespErrCoordinatorLoadInProgress 
RdKafkaRespErrCoordinatorNotAvailable 
RdKafkaRespErrNotCoordinator 
RdKafkaRespErrTopicException 
RdKafkaRespErrRecordListTooLarge 
RdKafkaRespErrNotEnoughReplicas 
RdKafkaRespErrNotEnoughReplicasAfterAppend 
RdKafkaRespErrInvalidRequiredAcks 
RdKafkaRespErrIllegalGeneration 
RdKafkaRespErrInconsistentGroupProtocol 
RdKafkaRespErrInvalidGroupId 
RdKafkaRespErrUnknownMemberId 
RdKafkaRespErrInvalidSessionTimeout 
RdKafkaRespErrRebalanceInProgress 
RdKafkaRespErrInvalidCommitOffsetSize 
RdKafkaRespErrTopicAuthorizationFailed 
RdKafkaRespErrGroupAuthorizationFailed 
RdKafkaRespErrClusterAuthorizationFailed 
RdKafkaRespErrInvalidTimestamp 
RdKafkaRespErrUnsupportedSaslMechanism 
RdKafkaRespErrIllegalSaslState 
RdKafkaRespErrUnsupportedVersion 
RdKafkaRespErrTopicAlreadyExists 
RdKafkaRespErrInvalidPartitions 
RdKafkaRespErrInvalidReplicationFactor 
RdKafkaRespErrInvalidReplicaAssignment 
RdKafkaRespErrInvalidConfig 
RdKafkaRespErrNotController 
RdKafkaRespErrInvalidRequest 
RdKafkaRespErrUnsupportedForMessageFormat 
RdKafkaRespErrPolicyViolation 
RdKafkaRespErrOutOfOrderSequenceNumber 
RdKafkaRespErrDuplicateSequenceNumber 
RdKafkaRespErrInvalidProducerEpoch 
RdKafkaRespErrInvalidTxnState 
RdKafkaRespErrInvalidProducerIdMapping 
RdKafkaRespErrInvalidTransactionTimeout 
RdKafkaRespErrConcurrentTransactions 
RdKafkaRespErrTransactionCoordinatorFenced 
RdKafkaRespErrTransactionalIdAuthorizationFailed 
RdKafkaRespErrSecurityDisabled 
RdKafkaRespErrOperationNotAttempted 
RdKafkaRespErrKafkaStorageError 
RdKafkaRespErrLogDirNotFound 
RdKafkaRespErrSaslAuthenticationFailed 
RdKafkaRespErrUnknownProducerId 
RdKafkaRespErrReassignmentInProgress 
RdKafkaRespErrDelegationTokenAuthDisabled 
RdKafkaRespErrDelegationTokenNotFound 
RdKafkaRespErrDelegationTokenOwnerMismatch 
RdKafkaRespErrDelegationTokenRequestNotAllowed 
RdKafkaRespErrDelegationTokenAuthorizationFailed 
RdKafkaRespErrDelegationTokenExpired 
RdKafkaRespErrInvalidPrincipalType 
RdKafkaRespErrNonEmptyGroup 
RdKafkaRespErrGroupIdNotFound 
RdKafkaRespErrFetchSessionIdNotFound 
RdKafkaRespErrInvalidFetchSessionEpoch 
RdKafkaRespErrListenerNotFound 
RdKafkaRespErrTopicDeletionDisabled 
RdKafkaRespErrFencedLeaderEpoch 
RdKafkaRespErrUnknownLeaderEpoch 
RdKafkaRespErrUnsupportedCompressionType 
RdKafkaRespErrStaleBrokerEpoch 
RdKafkaRespErrOffsetNotAvailable 
RdKafkaRespErrMemberIdRequired 
RdKafkaRespErrPreferredLeaderNotAvailable 
RdKafkaRespErrGroupMaxSizeReached 
RdKafkaRespErrFencedInstanceId 
RdKafkaRespErrEligibleLeadersNotAvailable 
RdKafkaRespErrElectionNotNeeded 
RdKafkaRespErrNoReassignmentInProgress 
RdKafkaRespErrGroupSubscribedToTopic 
RdKafkaRespErrInvalidRecord 
RdKafkaRespErrUnstableOffsetCommit 
RdKafkaRespErrThrottlingQuotaExceeded 
RdKafkaRespErrProducerFenced 
RdKafkaRespErrResourceNotFound 
RdKafkaRespErrDuplicateResource 
RdKafkaRespErrUnacceptableCredential 
RdKafkaRespErrInconsistentVoterSet 
RdKafkaRespErrInvalidUpdateVersion 
RdKafkaRespErrFeatureUpdateFailed 
RdKafkaRespErrPrincipalDeserializationFailure 
RdKafkaRespErrEndAll