{-# LANGUAGE TypeApplications #-} module Kafka.Producer.Callbacks ( deliveryCallback , module X ) where import Control.Monad (void) import Control.Concurrent (forkIO) import Foreign.C.Error (getErrno) import Foreign.Ptr (Ptr, nullPtr) import Foreign.Storable (Storable(peek)) import Foreign.StablePtr (castPtrToStablePtr, deRefStablePtr) import Kafka.Callbacks as X import Kafka.Consumer.Types (Offset(..)) import Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb) import Kafka.Internal.Setup (KafkaConf(..), getRdKafkaConf) import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload) import Kafka.Producer.Types (ProducerRecord(..), DeliveryReport(..), ProducePartition(..)) import Kafka.Types (KafkaError(..), TopicName(..)) -- | Sets the callback for delivery reports. -- -- /Note: A callback should not be a long-running process as it blocks -- librdkafka from continuing on the thread that handles the delivery -- callbacks. For callbacks to individual messsages see -- 'Kafka.Producer.produceMessage\''./ -- deliveryCallback :: (DeliveryReport -> IO ()) -> KafkaConf -> IO () deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb where realCb :: t -> Ptr RdKafkaMessageT -> IO () realCb _ mptr = if mptr == nullPtr then getErrno >>= (callback . NoMessageError . kafkaRespErr) else do s <- peek mptr let cbPtr = opaque'RdKafkaMessageT s if err'RdKafkaMessageT s /= RdKafkaRespErrNoError then mkErrorReport s >>= callbacks cbPtr else mkSuccessReport s >>= callbacks cbPtr callbacks cbPtr rep = do callback rep if cbPtr == nullPtr then pure () else do msgCb <- deRefStablePtr @(DeliveryReport -> IO ()) $ castPtrToStablePtr $ cbPtr -- Here we fork the callback since it might be a longer action and -- blocking here would block librdkafka from continuing its execution void . forkIO $ msgCb rep mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport mkErrorReport msg = do prodRec <- mkProdRec msg pure $ DeliveryFailure prodRec (KafkaResponseError (err'RdKafkaMessageT msg)) mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport mkSuccessReport msg = do prodRec <- mkProdRec msg pure $ DeliverySuccess prodRec (Offset $ offset'RdKafkaMessageT msg) mkProdRec :: RdKafkaMessageT -> IO ProducerRecord mkProdRec msg = do topic <- readTopic msg key <- readKey msg payload <- readPayload msg pure ProducerRecord { prTopic = TopicName topic , prPartition = SpecifiedPartition (partition'RdKafkaMessageT msg) , prKey = key , prValue = payload }