| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Kafka.Producer
Description
Module to produce messages to Kafka topics.
Here's an example of code to produce messages to a topic:
import Control.Exception (bracket) import Control.Monad (forM_) import Data.ByteString (ByteString) import Kafka.Producer -- Global producer properties producerProps ::ProducerPropertiesproducerProps =brokersList["localhost:9092"] <>logLevelKafkaLogDebug-- Topic to send messages to targetTopic ::TopicNametargetTopic =TopicName"kafka-client-example-topic" -- Run an example runProducerExample :: IO () runProducerExample = bracket mkProducer clProducer runHandler >>= print where mkProducer =newProducerproducerProps clProducer (Left _) = pure () clProducer (Right prod) =closeProducerprod runHandler (Left err) = pure $ Left err runHandler (Right prod) = sendMessages prod -- Example sending 2 messages and printing the response from Kafka sendMessages ::KafkaProducer-> IO (EitherKafkaError()) sendMessages prod = do err1 <-produceMessageprod (mkMessage Nothing (Just "test from producer") ) forM_ err1 print err2 <-produceMessageprod (mkMessage (Just "key") (Just "test from producer (with key)")) forM_ err2 print pure $ Right () mkMessage :: Maybe ByteString -> Maybe ByteString ->ProducerRecordmkMessage k v =ProducerRecord{prTopic= targetTopic ,prPartition=UnassignedPartition,prKey= k ,prValue= v }
Synopsis
- data KafkaProducer
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaError
- data KafkaLogLevel
- newtype Timeout = Timeout {}
- newtype BrokerAddress = BrokerAddress {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data TopicType
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- newtype ClientId = ClientId {
- unClientId :: Text
- newtype Millis = Millis {}
- newtype PartitionId = PartitionId {
- unPartitionId :: Int
- newtype BrokerId = BrokerId {
- unBrokerId :: Int
- topicType :: TopicName -> TopicType
- kafkaDebugToText :: KafkaDebug -> Text
- kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text
- data Callback
- data DeliveryReport
- newtype ImmediateError = ImmediateError KafkaError
- data ProducePartition
- data ProducerRecord = ProducerRecord {}
- kpKafkaPtr :: KafkaProducer -> Kafka
- kpKafkaConf :: KafkaProducer -> KafkaConf
- kpTopicConf :: KafkaProducer -> TopicConf
- errorCallback :: (KafkaError -> String -> IO ()) -> Callback
- logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
- statsCallback :: (ByteString -> IO ()) -> Callback
- deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
- data ProducerProperties = ProducerProperties {}
- brokersList :: [BrokerAddress] -> ProducerProperties
- setCallback :: Callback -> ProducerProperties
- logLevel :: KafkaLogLevel -> ProducerProperties
- compression :: KafkaCompressionCodec -> ProducerProperties
- topicCompression :: KafkaCompressionCodec -> ProducerProperties
- sendTimeout :: Timeout -> ProducerProperties
- statisticsInterval :: Millis -> ProducerProperties
- extraProps :: Map Text Text -> ProducerProperties
- suppressDisconnectLogs :: ProducerProperties
- extraTopicProps :: Map Text Text -> ProducerProperties
- debugOptions :: [KafkaDebug] -> ProducerProperties
- runProducer :: ProducerProperties -> (KafkaProducer -> IO (Either KafkaError a)) -> IO (Either KafkaError a)
- newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer)
- produceMessage :: MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
- produceMessageBatch :: MonadIO m => KafkaProducer -> [ProducerRecord] -> m [(ProducerRecord, KafkaError)]
- produceMessage' :: MonadIO m => KafkaProducer -> ProducerRecord -> (DeliveryReport -> IO ()) -> m (Either ImmediateError ())
- flushProducer :: MonadIO m => KafkaProducer -> m ()
- closeProducer :: MonadIO m => KafkaProducer -> m ()
- data RdKafkaRespErrT
- = RdKafkaRespErrBegin
- | RdKafkaRespErrBadMsg
- | RdKafkaRespErrBadCompression
- | RdKafkaRespErrDestroy
- | RdKafkaRespErrFail
- | RdKafkaRespErrTransport
- | RdKafkaRespErrCritSysResource
- | RdKafkaRespErrResolve
- | RdKafkaRespErrMsgTimedOut
- | RdKafkaRespErrPartitionEof
- | RdKafkaRespErrUnknownPartition
- | RdKafkaRespErrFs
- | RdKafkaRespErrUnknownTopic
- | RdKafkaRespErrAllBrokersDown
- | RdKafkaRespErrInvalidArg
- | RdKafkaRespErrTimedOut
- | RdKafkaRespErrQueueFull
- | RdKafkaRespErrIsrInsuff
- | RdKafkaRespErrNodeUpdate
- | RdKafkaRespErrSsl
- | RdKafkaRespErrWaitCoord
- | RdKafkaRespErrUnknownGroup
- | RdKafkaRespErrInProgress
- | RdKafkaRespErrPrevInProgress
- | RdKafkaRespErrExistingSubscription
- | RdKafkaRespErrAssignPartitions
- | RdKafkaRespErrRevokePartitions
- | RdKafkaRespErrConflict
- | RdKafkaRespErrState
- | RdKafkaRespErrUnknownProtocol
- | RdKafkaRespErrNotImplemented
- | RdKafkaRespErrAuthentication
- | RdKafkaRespErrNoOffset
- | RdKafkaRespErrOutdated
- | RdKafkaRespErrTimedOutQueue
- | RdKafkaRespErrUnsupportedFeature
- | RdKafkaRespErrWaitCache
- | RdKafkaRespErrIntr
- | RdKafkaRespErrKeySerialization
- | RdKafkaRespErrValueSerialization
- | RdKafkaRespErrKeyDeserialization
- | RdKafkaRespErrValueDeserialization
- | RdKafkaRespErrPartial
- | RdKafkaRespErrReadOnly
- | RdKafkaRespErrNoent
- | RdKafkaRespErrUnderflow
- | RdKafkaRespErrInvalidType
- | RdKafkaRespErrRetry
- | RdKafkaRespErrPurgeQueue
- | RdKafkaRespErrPurgeInflight
- | RdKafkaRespErrFatal
- | RdKafkaRespErrInconsistent
- | RdKafkaRespErrGaplessGuarantee
- | RdKafkaRespErrMaxPollExceeded
- | RdKafkaRespErrUnknownBroker
- | RdKafkaRespErrNotConfigured
- | RdKafkaRespErrFenced
- | RdKafkaRespErrApplication
- | RdKafkaRespErrEnd
- | RdKafkaRespErrUnknown
- | RdKafkaRespErrNoError
- | RdKafkaRespErrOffsetOutOfRange
- | RdKafkaRespErrInvalidMsg
- | RdKafkaRespErrUnknownTopicOrPart
- | RdKafkaRespErrInvalidMsgSize
- | RdKafkaRespErrLeaderNotAvailable
- | RdKafkaRespErrNotLeaderForPartition
- | RdKafkaRespErrRequestTimedOut
- | RdKafkaRespErrBrokerNotAvailable
- | RdKafkaRespErrReplicaNotAvailable
- | RdKafkaRespErrMsgSizeTooLarge
- | RdKafkaRespErrStaleCtrlEpoch
- | RdKafkaRespErrOffsetMetadataTooLarge
- | RdKafkaRespErrNetworkException
- | RdKafkaRespErrCoordinatorLoadInProgress
- | RdKafkaRespErrCoordinatorNotAvailable
- | RdKafkaRespErrNotCoordinator
- | RdKafkaRespErrTopicException
- | RdKafkaRespErrRecordListTooLarge
- | RdKafkaRespErrNotEnoughReplicas
- | RdKafkaRespErrNotEnoughReplicasAfterAppend
- | RdKafkaRespErrInvalidRequiredAcks
- | RdKafkaRespErrIllegalGeneration
- | RdKafkaRespErrInconsistentGroupProtocol
- | RdKafkaRespErrInvalidGroupId
- | RdKafkaRespErrUnknownMemberId
- | RdKafkaRespErrInvalidSessionTimeout
- | RdKafkaRespErrRebalanceInProgress
- | RdKafkaRespErrInvalidCommitOffsetSize
- | RdKafkaRespErrTopicAuthorizationFailed
- | RdKafkaRespErrGroupAuthorizationFailed
- | RdKafkaRespErrClusterAuthorizationFailed
- | RdKafkaRespErrInvalidTimestamp
- | RdKafkaRespErrUnsupportedSaslMechanism
- | RdKafkaRespErrIllegalSaslState
- | RdKafkaRespErrUnsupportedVersion
- | RdKafkaRespErrTopicAlreadyExists
- | RdKafkaRespErrInvalidPartitions
- | RdKafkaRespErrInvalidReplicationFactor
- | RdKafkaRespErrInvalidReplicaAssignment
- | RdKafkaRespErrInvalidConfig
- | RdKafkaRespErrNotController
- | RdKafkaRespErrInvalidRequest
- | RdKafkaRespErrUnsupportedForMessageFormat
- | RdKafkaRespErrPolicyViolation
- | RdKafkaRespErrOutOfOrderSequenceNumber
- | RdKafkaRespErrDuplicateSequenceNumber
- | RdKafkaRespErrInvalidProducerEpoch
- | RdKafkaRespErrInvalidTxnState
- | RdKafkaRespErrInvalidProducerIdMapping
- | RdKafkaRespErrInvalidTransactionTimeout
- | RdKafkaRespErrConcurrentTransactions
- | RdKafkaRespErrTransactionCoordinatorFenced
- | RdKafkaRespErrTransactionalIdAuthorizationFailed
- | RdKafkaRespErrSecurityDisabled
- | RdKafkaRespErrOperationNotAttempted
- | RdKafkaRespErrKafkaStorageError
- | RdKafkaRespErrLogDirNotFound
- | RdKafkaRespErrSaslAuthenticationFailed
- | RdKafkaRespErrUnknownProducerId
- | RdKafkaRespErrReassignmentInProgress
- | RdKafkaRespErrDelegationTokenAuthDisabled
- | RdKafkaRespErrDelegationTokenNotFound
- | RdKafkaRespErrDelegationTokenOwnerMismatch
- | RdKafkaRespErrDelegationTokenRequestNotAllowed
- | RdKafkaRespErrDelegationTokenAuthorizationFailed
- | RdKafkaRespErrDelegationTokenExpired
- | RdKafkaRespErrInvalidPrincipalType
- | RdKafkaRespErrNonEmptyGroup
- | RdKafkaRespErrGroupIdNotFound
- | RdKafkaRespErrFetchSessionIdNotFound
- | RdKafkaRespErrInvalidFetchSessionEpoch
- | RdKafkaRespErrListenerNotFound
- | RdKafkaRespErrTopicDeletionDisabled
- | RdKafkaRespErrFencedLeaderEpoch
- | RdKafkaRespErrUnknownLeaderEpoch
- | RdKafkaRespErrUnsupportedCompressionType
- | RdKafkaRespErrStaleBrokerEpoch
- | RdKafkaRespErrOffsetNotAvailable
- | RdKafkaRespErrMemberIdRequired
- | RdKafkaRespErrPreferredLeaderNotAvailable
- | RdKafkaRespErrGroupMaxSizeReached
- | RdKafkaRespErrFencedInstanceId
- | RdKafkaRespErrEligibleLeadersNotAvailable
- | RdKafkaRespErrElectionNotNeeded
- | RdKafkaRespErrNoReassignmentInProgress
- | RdKafkaRespErrGroupSubscribedToTopic
- | RdKafkaRespErrInvalidRecord
- | RdKafkaRespErrUnstableOffsetCommit
- | RdKafkaRespErrEndAll
Documentation
data KafkaProducer Source #
The main type for Kafka message production, used e.g. to send messages.
Its constructor is intentionally not exposed, instead, one should used newProducer to acquire such a value.
data KafkaCompressionCodec Source #
Compression codec used by a topic
Constructors
| NoCompression | |
| Gzip | |
| Snappy | |
| Lz4 |
Instances
data KafkaDebug Source #
Available librdkafka debug contexts
Constructors
| DebugGeneric | |
| DebugBroker | |
| DebugTopic | |
| DebugMetadata | |
| DebugQueue | |
| DebugMsg | |
| DebugProtocol | |
| DebugCgrp | |
| DebugSecurity | |
| DebugFetch | |
| DebugFeature | |
| DebugAll |
Instances
data KafkaError Source #
All possible Kafka errors
Constructors
| KafkaError Text | |
| KafkaInvalidReturnValue | |
| KafkaBadSpecification Text | |
| KafkaResponseError RdKafkaRespErrT | |
| KafkaInvalidConfigurationValue Text | |
| KafkaUnknownConfigurationKey Text | |
| KafkaBadConfiguration |
Instances
data KafkaLogLevel Source #
Log levels for librdkafka.
Constructors
| KafkaLogEmerg | |
| KafkaLogAlert | |
| KafkaLogCrit | |
| KafkaLogErr | |
| KafkaLogWarning | |
| KafkaLogNotice | |
| KafkaLogInfo | |
| KafkaLogDebug |
Instances
| Enum KafkaLogLevel Source # | |
Defined in Kafka.Types Methods succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
| Eq KafkaLogLevel Source # | |
Defined in Kafka.Types Methods (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
| Show KafkaLogLevel Source # | |
Defined in Kafka.Types Methods showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # | |
Timeout in milliseconds
newtype BrokerAddress Source #
Kafka broker address string (e.g. broker1:9092)
Constructors
| BrokerAddress | |
Fields | |
Instances
| Eq BrokerAddress Source # | |
Defined in Kafka.Types Methods (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
| Show BrokerAddress Source # | |
Defined in Kafka.Types Methods showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
| IsString BrokerAddress Source # | |
Defined in Kafka.Types Methods fromString :: String -> BrokerAddress # | |
| Generic BrokerAddress Source # | |
Defined in Kafka.Types Associated Types type Rep BrokerAddress :: Type -> Type # | |
| type Rep BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress = D1 ('MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-4.0.3-inplace" 'True) (C1 ('MetaCons "BrokerAddress" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBrokerAddress") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text))) | |
Topic name to consume/produce messages
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^ will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
Constructors
| TopicName | |
Fields
| |
Instances
| Eq TopicName Source # | |
| Ord TopicName Source # | |
| Read TopicName Source # | |
| Show TopicName Source # | |
| IsString TopicName Source # | |
Defined in Kafka.Types Methods fromString :: String -> TopicName # | |
| Generic TopicName Source # | |
| type Rep TopicName Source # | |
Defined in Kafka.Types | |
Whether the topic is created by a user or by the system
Constructors
| User | Normal topics that are created by user. |
| System | Topics starting with a double underscore "__" ( |
Instances
| Eq TopicType Source # | |
| Ord TopicType Source # | |
| Read TopicType Source # | |
| Show TopicType Source # | |
| Generic TopicType Source # | |
| type Rep TopicType Source # | |
Batch size used for polling
Constructors
| BatchSize | |
Fields
| |
Instances
| Eq BatchSize Source # | |
| Num BatchSize Source # | |
| Ord BatchSize Source # | |
| Read BatchSize Source # | |
| Show BatchSize Source # | |
| Generic BatchSize Source # | |
| type Rep BatchSize Source # | |
Defined in Kafka.Types | |
Client ID used by Kafka to better track requests
Constructors
| ClientId | |
Fields
| |
A number of milliseconds, used to represent durations and timestamps
newtype PartitionId Source #
Topic partition ID
Constructors
| PartitionId | |
Fields
| |
Instances
Kafka broker ID
Constructors
| BrokerId | |
Fields
| |
topicType :: TopicName -> TopicType Source #
Deduce the type of a topic from its name, by checking if it starts with a double underscore "__"
kafkaDebugToText :: KafkaDebug -> Text Source #
Convert a KafkaDebug into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text Source #
Convert a KafkaCompressionCodec into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
Callbacks allow retrieving various information like error occurences, statistics
and log messages.
See setCallback (Consumer) and setCallback (Producer) for more details.
data DeliveryReport Source #
The result of sending a message to the broker, useful for callbacks
Constructors
| DeliverySuccess ProducerRecord Offset | The message was successfully sent at this offset |
| DeliveryFailure ProducerRecord KafkaError | The message could not be sent |
| NoMessageError KafkaError | An error occurred, but librdkafka did not attach any sent message |
Instances
newtype ImmediateError Source #
Data type representing an error that is caused by pre-flight conditions not being met
Constructors
| ImmediateError KafkaError |
Instances
| Eq ImmediateError Source # | |
Defined in Kafka.Producer.Types Methods (==) :: ImmediateError -> ImmediateError -> Bool # (/=) :: ImmediateError -> ImmediateError -> Bool # | |
| Show ImmediateError Source # | |
Defined in Kafka.Producer.Types Methods showsPrec :: Int -> ImmediateError -> ShowS # show :: ImmediateError -> String # showList :: [ImmediateError] -> ShowS # | |
data ProducePartition Source #
Constructors
| SpecifiedPartition !Int | The partition number of the topic |
| UnassignedPartition | Let the Kafka broker decide the partition |
Instances
data ProducerRecord Source #
Represents messages to be enqueued onto a Kafka broker (i.e. used for a producer)
Constructors
| ProducerRecord | |
Fields
| |
Instances
kpKafkaPtr :: KafkaProducer -> Kafka Source #
kpKafkaConf :: KafkaProducer -> KafkaConf Source #
kpTopicConf :: KafkaProducer -> TopicConf Source #
errorCallback :: (KafkaError -> String -> IO ()) -> Callback Source #
Add a callback for errors.
Examples
Basic usage:
'setCallback' ('errorCallback' myErrorCallback)
myErrorCallback :: 'KafkaError' -> String -> IO ()
myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> messagelogCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback Source #
Add a callback for logs.
Examples
Basic usage:
'setCallback' ('logCallback' myLogCallback)
myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> messagestatsCallback :: (ByteString -> IO ()) -> Callback Source #
Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md.
Examples
Basic usage:
'setCallback' ('statsCallback' myStatsCallback)
myStatsCallback :: String -> IO ()
myStatsCallback stats = print $ show statsdeliveryCallback :: (DeliveryReport -> IO ()) -> Callback Source #
Sets the callback for delivery reports.
/Note: A callback should not be a long-running process as it blocks librdkafka from continuing on the thread that handles the delivery callbacks. For callbacks to individual messsages see 'Kafka.Producer.produceMessage''./
data ProducerProperties Source #
Properties to create KafkaProducer.
Constructors
| ProducerProperties | |
Fields
| |
Instances
| Semigroup ProducerProperties Source # | |
Defined in Kafka.Producer.ProducerProperties Methods (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties # | |
| Monoid ProducerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Producer.ProducerProperties Methods mempty :: ProducerProperties # mappend :: ProducerProperties -> ProducerProperties -> ProducerProperties # mconcat :: [ProducerProperties] -> ProducerProperties # | |
brokersList :: [BrokerAddress] -> ProducerProperties Source #
Set the list of brokers to contact to connect to the Kafka cluster.
logLevel :: KafkaLogLevel -> ProducerProperties Source #
Sets the logging level.
Usually is used with debugOptions to configure which logs are needed.
compression :: KafkaCompressionCodec -> ProducerProperties Source #
Set the compression.codec for the producer.
topicCompression :: KafkaCompressionCodec -> ProducerProperties Source #
Set the compression.codec for the topic.
sendTimeout :: Timeout -> ProducerProperties Source #
Set the message.timeout.ms.
statisticsInterval :: Millis -> ProducerProperties Source #
Set the statistics.interval.ms for the producer.
extraProps :: Map Text Text -> ProducerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
suppressDisconnectLogs :: ProducerProperties Source #
Suppresses producer disconnects logs.
It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.
extraTopicProps :: Map Text Text -> ProducerProperties Source #
Any *topic* configuration options that are supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ProducerProperties Source #
runProducer :: ProducerProperties -> (KafkaProducer -> IO (Either KafkaError a)) -> IO (Either KafkaError a) Source #
Deprecated: Use newProducer/closeProducer instead
Runs Kafka Producer.
The callback provided is expected to call produceMessage
or/and produceMessageBatch to send messages to Kafka.
newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer) Source #
Creates a new kafka producer
A newly created producer must be closed with closeProducer function.
produceMessage :: MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError) Source #
Sends a single message.
Since librdkafka is backed by a queue, this function can return before messages are sent. See
flushProducer to wait for queue to empty.
Arguments
| :: MonadIO m | |
| => KafkaProducer | |
| -> [ProducerRecord] | |
| -> m [(ProducerRecord, KafkaError)] | An empty list when the operation is successful, otherwise a list of "failed" messages with corresponsing errors. |
Sends a batch of messages.
Returns a list of messages which it was unable to send with corresponding errors.
Since librdkafka is backed by a queue, this function can return before messages are sent. See
flushProducer to wait for queue to empty.
produceMessage' :: MonadIO m => KafkaProducer -> ProducerRecord -> (DeliveryReport -> IO ()) -> m (Either ImmediateError ()) Source #
Sends a single message with a registered callback.
The callback can be a long running process, as it is forked by the thread that handles the delivery reports.
flushProducer :: MonadIO m => KafkaProducer -> m () Source #
Drains the outbound queue for a producer.
This function is also called automatically when the producer is closed
with closeProducer to ensure that all queued messages make it to Kafka.
closeProducer :: MonadIO m => KafkaProducer -> m () Source #
Closes the producer. Will wait until the outbound queue is drained before returning the control.
data RdKafkaRespErrT Source #
Constructors
Instances
| Bounded RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka | |
| Enum RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka Methods succ :: RdKafkaRespErrT -> RdKafkaRespErrT # pred :: RdKafkaRespErrT -> RdKafkaRespErrT # toEnum :: Int -> RdKafkaRespErrT # fromEnum :: RdKafkaRespErrT -> Int # enumFrom :: RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThen :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThenTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # | |
| Eq RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka Methods (==) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # (/=) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # | |
| Show RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka Methods showsPrec :: Int -> RdKafkaRespErrT -> ShowS # show :: RdKafkaRespErrT -> String # showList :: [RdKafkaRespErrT] -> ShowS # | |