module Kafka.Producer.Callbacks
( deliveryErrorsCallback
, module X
)
where
import Foreign
import Foreign.C.Error
import Kafka.Callbacks as X
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Internal.Shared
import Kafka.Types
deliveryErrorsCallback :: (KafkaError -> IO ()) -> KafkaConf -> IO ()
deliveryErrorsCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb
where
realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb _ mptr =
if mptr == nullPtr
then getErrno >>= (callback . kafkaRespErr)
else do
s <- peek mptr
if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
then (callback . KafkaResponseError $ err'RdKafkaMessageT s)
else pure ()