module Kafka.Producer.Callbacks
( deliveryCallback
, module X
)
where
import Foreign
import Foreign.C.Error
import Kafka.Callbacks as X
import Kafka.Consumer.Types
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Internal.Shared
import Kafka.Producer.Types
import Kafka.Types
deliveryCallback :: (DeliveryReport -> IO ()) -> KafkaConf -> IO ()
deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb
where
realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb _ mptr =
if mptr == nullPtr
then getErrno >>= (callback . NoMessageError . kafkaRespErr)
else do
s <- peek mptr
if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
then mkErrorReport s >>= callback
else mkSuccessReport s >>= callback
mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport
mkErrorReport msg = do
prodRec <- mkProdRec msg
pure $ DeliveryFailure prodRec (KafkaResponseError (err'RdKafkaMessageT msg))
mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport
mkSuccessReport msg = do
prodRec <- mkProdRec msg
pure $ DeliverySuccess prodRec (Offset $ offset'RdKafkaMessageT msg)
mkProdRec :: RdKafkaMessageT -> IO ProducerRecord
mkProdRec msg = do
topic <- readTopic msg
key <- readKey msg
payload <- readPayload msg
pure ProducerRecord
{ prTopic = TopicName topic
, prPartition = SpecifiedPartition (partition'RdKafkaMessageT msg)
, prKey = key
, prValue = payload
}