Safe Haskell | None |
---|---|
Language | Haskell2010 |
Module to produce messages to Kafka topics.
Here's an example of code to produce messages to a topic:
import Control.Exception (bracket) import Control.Monad (forM_) import Data.ByteString (ByteString) import Kafka.Producer -- Global producer properties producerProps ::ProducerProperties
producerProps =brokersList
["localhost:9092"] <>logLevel
KafkaLogDebug
-- Topic to send messages to targetTopic ::TopicName
targetTopic =TopicName
"kafka-client-example-topic" -- Run an example runProducerExample :: IO () runProducerExample = bracket mkProducer clProducer runHandler >>= print where mkProducer =newProducer
producerProps clProducer (Left _) = pure () clProducer (Right prod) =closeProducer
prod runHandler (Left err) = pure $ Left err runHandler (Right prod) = sendMessages prod -- Example sending 2 messages and printing the response from Kafka sendMessages ::KafkaProducer
-> IO (EitherKafkaError
()) sendMessages prod = do err1 <-produceMessage
prod (mkMessage Nothing (Just "test from producer") ) forM_ err1 print err2 <-produceMessage
prod (mkMessage (Just "key") (Just "test from producer (with key)")) forM_ err2 print pure $ Right () mkMessage :: Maybe ByteString -> Maybe ByteString ->ProducerRecord
mkMessage k v =ProducerRecord
{prTopic
= targetTopic ,prPartition
=UnassignedPartition
,prKey
= k ,prValue
= v }
Synopsis
- data KafkaProducer
- data Headers
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaError
- data KafkaLogLevel
- newtype Timeout = Timeout {}
- newtype BrokerAddress = BrokerAddress {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data TopicType
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- newtype ClientId = ClientId {
- unClientId :: Text
- newtype Millis = Millis {}
- newtype PartitionId = PartitionId {
- unPartitionId :: Int
- newtype BrokerId = BrokerId {
- unBrokerId :: Int
- topicType :: TopicName -> TopicType
- kafkaDebugToText :: KafkaDebug -> Text
- kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text
- headersFromList :: [(ByteString, ByteString)] -> Headers
- headersToList :: Headers -> [(ByteString, ByteString)]
- data Callback
- data DeliveryReport
- newtype ImmediateError = ImmediateError KafkaError
- data ProducePartition
- data ProducerRecord = ProducerRecord {}
- kpKafkaPtr :: KafkaProducer -> Kafka
- kpKafkaConf :: KafkaProducer -> KafkaConf
- kpTopicConf :: KafkaProducer -> TopicConf
- errorCallback :: (KafkaError -> String -> IO ()) -> Callback
- logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
- statsCallback :: (ByteString -> IO ()) -> Callback
- deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
- data ProducerProperties = ProducerProperties {}
- brokersList :: [BrokerAddress] -> ProducerProperties
- setCallback :: Callback -> ProducerProperties
- logLevel :: KafkaLogLevel -> ProducerProperties
- compression :: KafkaCompressionCodec -> ProducerProperties
- topicCompression :: KafkaCompressionCodec -> ProducerProperties
- sendTimeout :: Timeout -> ProducerProperties
- statisticsInterval :: Millis -> ProducerProperties
- extraProps :: Map Text Text -> ProducerProperties
- suppressDisconnectLogs :: ProducerProperties
- extraTopicProps :: Map Text Text -> ProducerProperties
- debugOptions :: [KafkaDebug] -> ProducerProperties
- runProducer :: ProducerProperties -> (KafkaProducer -> IO (Either KafkaError a)) -> IO (Either KafkaError a)
- newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer)
- produceMessage :: MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
- produceMessage' :: MonadIO m => KafkaProducer -> ProducerRecord -> (DeliveryReport -> IO ()) -> m (Either ImmediateError ())
- flushProducer :: MonadIO m => KafkaProducer -> m ()
- closeProducer :: MonadIO m => KafkaProducer -> m ()
- data RdKafkaRespErrT
- = RdKafkaRespErrBegin
- | RdKafkaRespErrBadMsg
- | RdKafkaRespErrBadCompression
- | RdKafkaRespErrDestroy
- | RdKafkaRespErrFail
- | RdKafkaRespErrTransport
- | RdKafkaRespErrCritSysResource
- | RdKafkaRespErrResolve
- | RdKafkaRespErrMsgTimedOut
- | RdKafkaRespErrPartitionEof
- | RdKafkaRespErrUnknownPartition
- | RdKafkaRespErrFs
- | RdKafkaRespErrUnknownTopic
- | RdKafkaRespErrAllBrokersDown
- | RdKafkaRespErrInvalidArg
- | RdKafkaRespErrTimedOut
- | RdKafkaRespErrQueueFull
- | RdKafkaRespErrIsrInsuff
- | RdKafkaRespErrNodeUpdate
- | RdKafkaRespErrSsl
- | RdKafkaRespErrWaitCoord
- | RdKafkaRespErrUnknownGroup
- | RdKafkaRespErrInProgress
- | RdKafkaRespErrPrevInProgress
- | RdKafkaRespErrExistingSubscription
- | RdKafkaRespErrAssignPartitions
- | RdKafkaRespErrRevokePartitions
- | RdKafkaRespErrConflict
- | RdKafkaRespErrState
- | RdKafkaRespErrUnknownProtocol
- | RdKafkaRespErrNotImplemented
- | RdKafkaRespErrAuthentication
- | RdKafkaRespErrNoOffset
- | RdKafkaRespErrOutdated
- | RdKafkaRespErrTimedOutQueue
- | RdKafkaRespErrUnsupportedFeature
- | RdKafkaRespErrWaitCache
- | RdKafkaRespErrIntr
- | RdKafkaRespErrKeySerialization
- | RdKafkaRespErrValueSerialization
- | RdKafkaRespErrKeyDeserialization
- | RdKafkaRespErrValueDeserialization
- | RdKafkaRespErrPartial
- | RdKafkaRespErrReadOnly
- | RdKafkaRespErrNoent
- | RdKafkaRespErrUnderflow
- | RdKafkaRespErrInvalidType
- | RdKafkaRespErrRetry
- | RdKafkaRespErrPurgeQueue
- | RdKafkaRespErrPurgeInflight
- | RdKafkaRespErrFatal
- | RdKafkaRespErrInconsistent
- | RdKafkaRespErrGaplessGuarantee
- | RdKafkaRespErrMaxPollExceeded
- | RdKafkaRespErrUnknownBroker
- | RdKafkaRespErrNotConfigured
- | RdKafkaRespErrFenced
- | RdKafkaRespErrApplication
- | RdKafkaRespErrAssignmentLost
- | RdKafkaRespErrNoop
- | RdKafkaRespErrAutoOffsetReset
- | RdKafkaRespErrEnd
- | RdKafkaRespErrUnknown
- | RdKafkaRespErrNoError
- | RdKafkaRespErrOffsetOutOfRange
- | RdKafkaRespErrInvalidMsg
- | RdKafkaRespErrUnknownTopicOrPart
- | RdKafkaRespErrInvalidMsgSize
- | RdKafkaRespErrLeaderNotAvailable
- | RdKafkaRespErrNotLeaderForPartition
- | RdKafkaRespErrRequestTimedOut
- | RdKafkaRespErrBrokerNotAvailable
- | RdKafkaRespErrReplicaNotAvailable
- | RdKafkaRespErrMsgSizeTooLarge
- | RdKafkaRespErrStaleCtrlEpoch
- | RdKafkaRespErrOffsetMetadataTooLarge
- | RdKafkaRespErrNetworkException
- | RdKafkaRespErrCoordinatorLoadInProgress
- | RdKafkaRespErrCoordinatorNotAvailable
- | RdKafkaRespErrNotCoordinator
- | RdKafkaRespErrTopicException
- | RdKafkaRespErrRecordListTooLarge
- | RdKafkaRespErrNotEnoughReplicas
- | RdKafkaRespErrNotEnoughReplicasAfterAppend
- | RdKafkaRespErrInvalidRequiredAcks
- | RdKafkaRespErrIllegalGeneration
- | RdKafkaRespErrInconsistentGroupProtocol
- | RdKafkaRespErrInvalidGroupId
- | RdKafkaRespErrUnknownMemberId
- | RdKafkaRespErrInvalidSessionTimeout
- | RdKafkaRespErrRebalanceInProgress
- | RdKafkaRespErrInvalidCommitOffsetSize
- | RdKafkaRespErrTopicAuthorizationFailed
- | RdKafkaRespErrGroupAuthorizationFailed
- | RdKafkaRespErrClusterAuthorizationFailed
- | RdKafkaRespErrInvalidTimestamp
- | RdKafkaRespErrUnsupportedSaslMechanism
- | RdKafkaRespErrIllegalSaslState
- | RdKafkaRespErrUnsupportedVersion
- | RdKafkaRespErrTopicAlreadyExists
- | RdKafkaRespErrInvalidPartitions
- | RdKafkaRespErrInvalidReplicationFactor
- | RdKafkaRespErrInvalidReplicaAssignment
- | RdKafkaRespErrInvalidConfig
- | RdKafkaRespErrNotController
- | RdKafkaRespErrInvalidRequest
- | RdKafkaRespErrUnsupportedForMessageFormat
- | RdKafkaRespErrPolicyViolation
- | RdKafkaRespErrOutOfOrderSequenceNumber
- | RdKafkaRespErrDuplicateSequenceNumber
- | RdKafkaRespErrInvalidProducerEpoch
- | RdKafkaRespErrInvalidTxnState
- | RdKafkaRespErrInvalidProducerIdMapping
- | RdKafkaRespErrInvalidTransactionTimeout
- | RdKafkaRespErrConcurrentTransactions
- | RdKafkaRespErrTransactionCoordinatorFenced
- | RdKafkaRespErrTransactionalIdAuthorizationFailed
- | RdKafkaRespErrSecurityDisabled
- | RdKafkaRespErrOperationNotAttempted
- | RdKafkaRespErrKafkaStorageError
- | RdKafkaRespErrLogDirNotFound
- | RdKafkaRespErrSaslAuthenticationFailed
- | RdKafkaRespErrUnknownProducerId
- | RdKafkaRespErrReassignmentInProgress
- | RdKafkaRespErrDelegationTokenAuthDisabled
- | RdKafkaRespErrDelegationTokenNotFound
- | RdKafkaRespErrDelegationTokenOwnerMismatch
- | RdKafkaRespErrDelegationTokenRequestNotAllowed
- | RdKafkaRespErrDelegationTokenAuthorizationFailed
- | RdKafkaRespErrDelegationTokenExpired
- | RdKafkaRespErrInvalidPrincipalType
- | RdKafkaRespErrNonEmptyGroup
- | RdKafkaRespErrGroupIdNotFound
- | RdKafkaRespErrFetchSessionIdNotFound
- | RdKafkaRespErrInvalidFetchSessionEpoch
- | RdKafkaRespErrListenerNotFound
- | RdKafkaRespErrTopicDeletionDisabled
- | RdKafkaRespErrFencedLeaderEpoch
- | RdKafkaRespErrUnknownLeaderEpoch
- | RdKafkaRespErrUnsupportedCompressionType
- | RdKafkaRespErrStaleBrokerEpoch
- | RdKafkaRespErrOffsetNotAvailable
- | RdKafkaRespErrMemberIdRequired
- | RdKafkaRespErrPreferredLeaderNotAvailable
- | RdKafkaRespErrGroupMaxSizeReached
- | RdKafkaRespErrFencedInstanceId
- | RdKafkaRespErrEligibleLeadersNotAvailable
- | RdKafkaRespErrElectionNotNeeded
- | RdKafkaRespErrNoReassignmentInProgress
- | RdKafkaRespErrGroupSubscribedToTopic
- | RdKafkaRespErrInvalidRecord
- | RdKafkaRespErrUnstableOffsetCommit
- | RdKafkaRespErrThrottlingQuotaExceeded
- | RdKafkaRespErrProducerFenced
- | RdKafkaRespErrResourceNotFound
- | RdKafkaRespErrDuplicateResource
- | RdKafkaRespErrUnacceptableCredential
- | RdKafkaRespErrInconsistentVoterSet
- | RdKafkaRespErrInvalidUpdateVersion
- | RdKafkaRespErrFeatureUpdateFailed
- | RdKafkaRespErrPrincipalDeserializationFailure
- | RdKafkaRespErrEndAll
Documentation
data KafkaProducer Source #
The main type for Kafka message production, used e.g. to send messages.
Its constructor is intentionally not exposed, instead, one should used newProducer
to acquire such a value.
Headers that might be passed along with a record
Instances
Eq Headers Source # | |
Read Headers Source # | |
Show Headers Source # | |
Generic Headers Source # | |
Semigroup Headers Source # | |
Monoid Headers Source # | |
type Rep Headers Source # | |
Defined in Kafka.Types type Rep Headers = D1 ('MetaData "Headers" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "Headers" 'PrefixI 'True) (S1 ('MetaSel ('Just "unHeaders") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(ByteString, ByteString)]))) |
data KafkaCompressionCodec Source #
Compression codec used by a topic
Instances
Eq KafkaCompressionCodec Source # | |
Defined in Kafka.Types (==) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # (/=) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # | |
Show KafkaCompressionCodec Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaCompressionCodec -> ShowS # show :: KafkaCompressionCodec -> String # showList :: [KafkaCompressionCodec] -> ShowS # | |
Generic KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec :: Type -> Type # | |
type Rep KafkaCompressionCodec Source # | |
Defined in Kafka.Types type Rep KafkaCompressionCodec = D1 ('MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'False) ((C1 ('MetaCons "NoCompression" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Gzip" 'PrefixI 'False) (U1 :: Type -> Type)) :+: (C1 ('MetaCons "Snappy" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Lz4" 'PrefixI 'False) (U1 :: Type -> Type))) |
data KafkaDebug Source #
Available librdkafka debug contexts
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
Instances
data KafkaError Source #
All possible Kafka errors
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
Instances
data KafkaLogLevel Source #
Log levels for librdkafka.
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Instances
Enum KafkaLogLevel Source # | |
Defined in Kafka.Types succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel Source # | |
Defined in Kafka.Types (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel Source # | |
Defined in Kafka.Types showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |
Timeout in milliseconds
newtype BrokerAddress Source #
Kafka broker address string (e.g. broker1:9092
)
Instances
Eq BrokerAddress Source # | |
Defined in Kafka.Types (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress Source # | |
Defined in Kafka.Types showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
IsString BrokerAddress Source # | |
Defined in Kafka.Types fromString :: String -> BrokerAddress # | |
Generic BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress :: Type -> Type # from :: BrokerAddress -> Rep BrokerAddress x # to :: Rep BrokerAddress x -> BrokerAddress # | |
type Rep BrokerAddress Source # | |
Defined in Kafka.Types type Rep BrokerAddress = D1 ('MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-4.0.4-inplace" 'True) (C1 ('MetaCons "BrokerAddress" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBrokerAddress") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text))) |
Topic name to consume/produce messages
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^
will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
TopicName | |
|
Whether the topic is created by a user or by the system
User | Normal topics that are created by user. |
System | Topics starting with a double underscore "__" ( |
Batch size used for polling
Client ID used by Kafka to better track requests
A number of milliseconds, used to represent durations and timestamps
newtype PartitionId Source #
Topic partition ID
Instances
Kafka broker ID
topicType :: TopicName -> TopicType Source #
Deduce the type of a topic from its name, by checking if it starts with a double underscore "__"
kafkaDebugToText :: KafkaDebug -> Text Source #
Convert a KafkaDebug
into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text Source #
Convert a KafkaCompressionCodec
into its librdkafka string equivalent.
This is used internally by the library but may be useful to some developers.
headersFromList :: [(ByteString, ByteString)] -> Headers Source #
headersToList :: Headers -> [(ByteString, ByteString)] Source #
Callbacks allow retrieving various information like error occurences, statistics
and log messages.
See setCallback
(Consumer) and setCallback
(Producer) for more details.
data DeliveryReport Source #
The result of sending a message to the broker, useful for callbacks
DeliverySuccess ProducerRecord Offset | The message was successfully sent at this offset |
DeliveryFailure ProducerRecord KafkaError | The message could not be sent |
NoMessageError KafkaError | An error occurred, but librdkafka did not attach any sent message |
Instances
newtype ImmediateError Source #
Data type representing an error that is caused by pre-flight conditions not being met
Instances
Eq ImmediateError Source # | |
Defined in Kafka.Producer.Types (==) :: ImmediateError -> ImmediateError -> Bool # (/=) :: ImmediateError -> ImmediateError -> Bool # | |
Show ImmediateError Source # | |
Defined in Kafka.Producer.Types showsPrec :: Int -> ImmediateError -> ShowS # show :: ImmediateError -> String # showList :: [ImmediateError] -> ShowS # |
data ProducePartition Source #
SpecifiedPartition !Int | The partition number of the topic |
UnassignedPartition | Let the Kafka broker decide the partition |
Instances
data ProducerRecord Source #
Represents messages to be enqueued onto a Kafka broker (i.e. used for a producer)
ProducerRecord | |
|
Instances
kpKafkaPtr :: KafkaProducer -> Kafka Source #
kpKafkaConf :: KafkaProducer -> KafkaConf Source #
kpTopicConf :: KafkaProducer -> TopicConf Source #
errorCallback :: (KafkaError -> String -> IO ()) -> Callback Source #
Add a callback for errors.
Examples
Basic usage:
'setCallback' ('errorCallback' myErrorCallback) myErrorCallback :: 'KafkaError' -> String -> IO () myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback Source #
Add a callback for logs.
Examples
Basic usage:
'setCallback' ('logCallback' myLogCallback) myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO () myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
statsCallback :: (ByteString -> IO ()) -> Callback Source #
Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md.
Examples
Basic usage:
'setCallback' ('statsCallback' myStatsCallback) myStatsCallback :: String -> IO () myStatsCallback stats = print $ show stats
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback Source #
Sets the callback for delivery reports.
/Note: A callback should not be a long-running process as it blocks librdkafka from continuing on the thread that handles the delivery callbacks. For callbacks to individual messsages see 'Kafka.Producer.produceMessage''./
data ProducerProperties Source #
Properties to create KafkaProducer
.
ProducerProperties | |
|
Instances
Semigroup ProducerProperties Source # | |
Defined in Kafka.Producer.ProducerProperties (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties # | |
Monoid ProducerProperties Source # | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Producer.ProducerProperties |
brokersList :: [BrokerAddress] -> ProducerProperties Source #
Set the list of brokers to contact to connect to the Kafka cluster.
logLevel :: KafkaLogLevel -> ProducerProperties Source #
Sets the logging level.
Usually is used with debugOptions
to configure which logs are needed.
compression :: KafkaCompressionCodec -> ProducerProperties Source #
Set the compression.codec for the producer.
topicCompression :: KafkaCompressionCodec -> ProducerProperties Source #
Set the compression.codec for the topic.
sendTimeout :: Timeout -> ProducerProperties Source #
Set the message.timeout.ms.
statisticsInterval :: Millis -> ProducerProperties Source #
Set the statistics.interval.ms for the producer.
extraProps :: Map Text Text -> ProducerProperties Source #
Any configuration options that are supported by librdkafka. The full list can be found here
suppressDisconnectLogs :: ProducerProperties Source #
Suppresses producer disconnects logs.
It might be useful to turn this off when interacting with brokers with an aggressive connection.max.idle.ms value.
extraTopicProps :: Map Text Text -> ProducerProperties Source #
Any *topic* configuration options that are supported by librdkafka. The full list can be found here
debugOptions :: [KafkaDebug] -> ProducerProperties Source #
runProducer :: ProducerProperties -> (KafkaProducer -> IO (Either KafkaError a)) -> IO (Either KafkaError a) Source #
Deprecated: Use newProducer
/closeProducer
instead
Runs Kafka Producer.
The callback provided is expected to call produceMessage
to send messages to Kafka.
newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer) Source #
Creates a new kafka producer
A newly created producer must be closed with closeProducer
function.
produceMessage :: MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError) Source #
Sends a single message.
Since librdkafka is backed by a queue, this function can return before messages are sent. See
flushProducer
to wait for queue to empty.
produceMessage' :: MonadIO m => KafkaProducer -> ProducerRecord -> (DeliveryReport -> IO ()) -> m (Either ImmediateError ()) Source #
Sends a single message with a registered callback.
The callback can be a long running process, as it is forked by the thread that handles the delivery reports.
flushProducer :: MonadIO m => KafkaProducer -> m () Source #
Drains the outbound queue for a producer.
This function is also called automatically when the producer is closed
with closeProducer
to ensure that all queued messages make it to Kafka.
closeProducer :: MonadIO m => KafkaProducer -> m () Source #
Closes the producer. Will wait until the outbound queue is drained before returning the control.
data RdKafkaRespErrT Source #
Instances
Bounded RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka | |
Enum RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka succ :: RdKafkaRespErrT -> RdKafkaRespErrT # pred :: RdKafkaRespErrT -> RdKafkaRespErrT # toEnum :: Int -> RdKafkaRespErrT # fromEnum :: RdKafkaRespErrT -> Int # enumFrom :: RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThen :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # enumFromThenTo :: RdKafkaRespErrT -> RdKafkaRespErrT -> RdKafkaRespErrT -> [RdKafkaRespErrT] # | |
Eq RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka (==) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # (/=) :: RdKafkaRespErrT -> RdKafkaRespErrT -> Bool # | |
Show RdKafkaRespErrT Source # | |
Defined in Kafka.Internal.RdKafka showsPrec :: Int -> RdKafkaRespErrT -> ShowS # show :: RdKafkaRespErrT -> String # showList :: [RdKafkaRespErrT] -> ShowS # |