module Kafka.Producer.Callbacks ( deliveryErrorsCallback , module X ) where import Foreign import Foreign.C.Error import Kafka.Callbacks as X import Kafka.Internal.RdKafka import Kafka.Internal.Setup import Kafka.Internal.Shared import Kafka.Types -- | Sets the callback for delivery errors. -- The callback is only called in case of errors. deliveryErrorsCallback :: (KafkaError -> IO ()) -> KafkaConf -> IO () deliveryErrorsCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb where realCb :: t -> Ptr RdKafkaMessageT -> IO () realCb _ mptr = if mptr == nullPtr then getErrno >>= (callback . kafkaRespErr) else do s <- peek mptr if err'RdKafkaMessageT s /= RdKafkaRespErrNoError then (callback . KafkaResponseError $ err'RdKafkaMessageT s) else pure ()