{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE LambdaCase #-}
module Kafka.Producer.Callbacks
( deliveryCallback
, module X
)
where

import           Control.Monad          (void)
import           Control.Exception      (bracket)
import           Control.Concurrent     (forkIO)
import           Foreign.C.Error        (getErrno)
import           Foreign.Ptr            (Ptr, nullPtr)
import           Foreign.Storable       (Storable(peek))
import           Foreign.StablePtr      (castPtrToStablePtr, deRefStablePtr, freeStablePtr)
import           Kafka.Callbacks        as X
import           Kafka.Consumer.Types   (Offset(..))
import           Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb)
import           Kafka.Internal.Setup   (getRdKafkaConf, Callback(..))
import           Kafka.Internal.Shared  (kafkaRespErr, readTopic, readKey, readPayload, readHeaders)
import           Kafka.Producer.Types   (ProducerRecord(..), DeliveryReport(..), ProducePartition(..))
import           Kafka.Types            (KafkaError(..), TopicName(..))
import Data.Either (fromRight)

-- | Sets the callback for delivery reports.
--
--   /Note: A callback should not be a long-running process as it blocks
--   librdkafka from continuing on the thread that handles the delivery
--   callbacks. For callbacks to individual messsages see
--   'Kafka.Producer.produceMessage\''./
--
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback callback :: DeliveryReport -> IO ()
callback = (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc -> RdKafkaConfTPtr -> DeliveryCallback -> IO ()
rdKafkaConfSetDrMsgCb (KafkaConf -> RdKafkaConfTPtr
forall k. HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf KafkaConf
kc) DeliveryCallback
forall t. t -> Ptr RdKafkaMessageT -> IO ()
realCb
  where
    realCb :: t -> Ptr RdKafkaMessageT -> IO ()
    realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb _ mptr :: Ptr RdKafkaMessageT
mptr =
      if Ptr RdKafkaMessageT
mptr Ptr RdKafkaMessageT -> Ptr RdKafkaMessageT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaMessageT
forall a. Ptr a
nullPtr
        then IO Errno
getErrno IO Errno -> (Errno -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (DeliveryReport -> IO ()
callback (DeliveryReport -> IO ())
-> (Errno -> DeliveryReport) -> Errno -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> DeliveryReport
NoMessageError (KafkaError -> DeliveryReport)
-> (Errno -> KafkaError) -> Errno -> DeliveryReport
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Errno -> KafkaError
kafkaRespErr)
        else do
          RdKafkaMessageT
s <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
mptr
          ProducerRecord
prodRec <- Ptr RdKafkaMessageT -> IO ProducerRecord
mkProdRec Ptr RdKafkaMessageT
mptr
          let cbPtr :: Ptr ()
cbPtr = RdKafkaMessageT -> Ptr ()
opaque'RdKafkaMessageT RdKafkaMessageT
s
          Ptr () -> DeliveryReport -> IO ()
callbacks Ptr ()
cbPtr (DeliveryReport -> IO ()) -> DeliveryReport -> IO ()
forall a b. (a -> b) -> a -> b
$ 
            if RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError
              then RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkErrorReport RdKafkaMessageT
s ProducerRecord
prodRec  
              else RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkSuccessReport RdKafkaMessageT
s ProducerRecord
prodRec

    callbacks :: Ptr () -> DeliveryReport -> IO ()
callbacks cbPtr :: Ptr ()
cbPtr rep :: DeliveryReport
rep = do
      DeliveryReport -> IO ()
callback DeliveryReport
rep
      if Ptr ()
cbPtr Ptr () -> Ptr () -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr ()
forall a. Ptr a
nullPtr then
        () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      else IO (StablePtr (DeliveryReport -> IO ()))
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StablePtr (DeliveryReport -> IO ())
 -> IO (StablePtr (DeliveryReport -> IO ())))
-> StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a b. (a -> b) -> a -> b
$ Ptr () -> StablePtr (DeliveryReport -> IO ())
forall a. Ptr () -> StablePtr a
castPtrToStablePtr Ptr ()
cbPtr) StablePtr (DeliveryReport -> IO ()) -> IO ()
forall a. StablePtr a -> IO ()
freeStablePtr ((StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \stablePtr :: StablePtr (DeliveryReport -> IO ())
stablePtr -> do
        DeliveryReport -> IO ()
msgCb <- StablePtr (DeliveryReport -> IO ()) -> IO (DeliveryReport -> IO ())
forall a. StablePtr a -> IO a
deRefStablePtr @(DeliveryReport -> IO ()) StablePtr (DeliveryReport -> IO ())
stablePtr
        -- Here we fork the callback since it might be a longer action and
        -- blocking here would block librdkafka from continuing its execution
        IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ DeliveryReport -> IO ()
msgCb DeliveryReport
rep

mkErrorReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkErrorReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkErrorReport msg :: RdKafkaMessageT
msg prodRec :: ProducerRecord
prodRec = ProducerRecord -> KafkaError -> DeliveryReport
DeliveryFailure ProducerRecord
prodRec (RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
msg))

mkSuccessReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkSuccessReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkSuccessReport msg :: RdKafkaMessageT
msg prodRec :: ProducerRecord
prodRec = ProducerRecord -> Offset -> DeliveryReport
DeliverySuccess ProducerRecord
prodRec (Int64 -> Offset
Offset (Int64 -> Offset) -> Int64 -> Offset
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int64
offset'RdKafkaMessageT RdKafkaMessageT
msg)

mkProdRec :: Ptr RdKafkaMessageT -> IO ProducerRecord
mkProdRec :: Ptr RdKafkaMessageT -> IO ProducerRecord
mkProdRec pmsg :: Ptr RdKafkaMessageT
pmsg = do
  RdKafkaMessageT
msg         <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
pmsg  
  Text
topic       <- RdKafkaMessageT -> IO Text
readTopic RdKafkaMessageT
msg
  Maybe ByteString
key         <- RdKafkaMessageT -> IO (Maybe ByteString)
readKey RdKafkaMessageT
msg
  Maybe ByteString
payload     <- RdKafkaMessageT -> IO (Maybe ByteString)
readPayload RdKafkaMessageT
msg
  ((Headers -> ProducerRecord) -> IO Headers -> IO ProducerRecord)
-> IO Headers -> (Headers -> ProducerRecord) -> IO ProducerRecord
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Headers -> ProducerRecord) -> IO Headers -> IO ProducerRecord
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Headers -> Either RdKafkaRespErrT Headers -> Headers
forall b a. b -> Either a b -> b
fromRight Headers
forall a. Monoid a => a
mempty (Either RdKafkaRespErrT Headers -> Headers)
-> IO (Either RdKafkaRespErrT Headers) -> IO Headers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
readHeaders Ptr RdKafkaMessageT
pmsg) ((Headers -> ProducerRecord) -> IO ProducerRecord)
-> (Headers -> ProducerRecord) -> IO ProducerRecord
forall a b. (a -> b) -> a -> b
$ \headers :: Headers
headers -> 
    $WProducerRecord :: TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> Headers
-> ProducerRecord
ProducerRecord
      { prTopic :: TopicName
prTopic = Text -> TopicName
TopicName Text
topic
      , prPartition :: ProducePartition
prPartition = Int -> ProducePartition
SpecifiedPartition (RdKafkaMessageT -> Int
partition'RdKafkaMessageT RdKafkaMessageT
msg)
      , prKey :: Maybe ByteString
prKey = Maybe ByteString
key
      , prValue :: Maybe ByteString
prValue = Maybe ByteString
payload
      , prHeaders :: Headers
prHeaders = Headers
headers
      }