module Kafka.Producer.Callbacks ( deliveryCallback , module X ) where import Foreign import Foreign.C.Error import Kafka.Callbacks as X import Kafka.Consumer.Types import Kafka.Internal.RdKafka import Kafka.Internal.Setup import Kafka.Internal.Shared import Kafka.Producer.Types import Kafka.Types -- | Sets the callback for delivery reports. deliveryCallback :: (DeliveryReport -> IO ()) -> KafkaConf -> IO () deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb where realCb :: t -> Ptr RdKafkaMessageT -> IO () realCb _ mptr = if mptr == nullPtr then getErrno >>= (callback . NoMessageError . kafkaRespErr) else do s <- peek mptr if err'RdKafkaMessageT s /= RdKafkaRespErrNoError then mkErrorReport s >>= callback else mkSuccessReport s >>= callback mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport mkErrorReport msg = do prodRec <- mkProdRec msg pure $ DeliveryFailure prodRec (KafkaResponseError (err'RdKafkaMessageT msg)) mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport mkSuccessReport msg = do prodRec <- mkProdRec msg pure $ DeliverySuccess prodRec (Offset $ offset'RdKafkaMessageT msg) mkProdRec :: RdKafkaMessageT -> IO ProducerRecord mkProdRec msg = do topic <- readTopic msg key <- readKey msg payload <- readPayload msg pure ProducerRecord { prTopic = TopicName topic , prPartition = SpecifiedPartition (partition'RdKafkaMessageT msg) , prKey = key , prValue = payload }