{-# LANGUAGE TupleSections              #-}
{-# LANGUAGE LambdaCase                 #-}

-----------------------------------------------------------------------------
-- |
-- Module to produce messages to Kafka topics.
-- 
-- Here's an example of code to produce messages to a topic:
-- 
-- @
-- import Control.Exception (bracket)
-- import Control.Monad (forM_)
-- import Data.ByteString (ByteString)
-- import Kafka.Producer
-- 
-- -- Global producer properties
-- producerProps :: 'ProducerProperties'
-- producerProps = 'brokersList' ["localhost:9092"]
--              <> 'logLevel' 'KafkaLogDebug'
-- 
-- -- Topic to send messages to
-- targetTopic :: 'TopicName'
-- targetTopic = 'TopicName' "kafka-client-example-topic"
-- 
-- -- Run an example
-- runProducerExample :: IO ()
-- runProducerExample =
--     bracket mkProducer clProducer runHandler >>= print
--     where
--       mkProducer = 'newProducer' producerProps
--       clProducer (Left _)     = pure ()
--       clProducer (Right prod) = 'closeProducer' prod
--       runHandler (Left err)   = pure $ Left err
--       runHandler (Right prod) = sendMessages prod
-- 
-- -- Example sending 2 messages and printing the response from Kafka
-- sendMessages :: 'KafkaProducer' -> IO (Either 'KafkaError' ())
-- sendMessages prod = do
--   err1 <- 'produceMessage' prod (mkMessage Nothing (Just "test from producer") )
--   forM_ err1 print
-- 
--   err2 <- 'produceMessage' prod (mkMessage (Just "key") (Just "test from producer (with key)"))
--   forM_ err2 print
-- 
--   pure $ Right ()
-- 
-- mkMessage :: Maybe ByteString -> Maybe ByteString -> 'ProducerRecord'
-- mkMessage k v = 'ProducerRecord'
--                   { 'prTopic' = targetTopic
--                   , 'prPartition' = 'UnassignedPartition'
--                   , 'prKey' = k
--                   , 'prValue' = v
--                   }
-- @
-----------------------------------------------------------------------------
module Kafka.Producer
( KafkaProducer
, module X
, runProducer
, newProducer
, produceMessage
, produceMessage'
, flushProducer
, closeProducer
, RdKafkaRespErrT (..)
)
where

import           Control.Exception        (bracket)
import           Control.Monad            (forM_)
import           Control.Monad.IO.Class   (MonadIO (liftIO))
import qualified Data.ByteString          as BS
import qualified Data.ByteString.Internal as BSI
import qualified Data.Text                as Text
import           Foreign.C.String         (withCString)
import           Foreign.ForeignPtr       (withForeignPtr)
import           Foreign.Marshal.Utils    (withMany)
import           Foreign.Ptr              (Ptr, nullPtr, plusPtr)
import           Foreign.StablePtr        (newStablePtr, castStablePtrToPtr)
import           Kafka.Internal.RdKafka   (RdKafkaRespErrT (..), RdKafkaTypeT (..), RdKafkaVuT(..), newRdKafkaT, rdKafkaErrorCode, rdKafkaErrorDestroy, rdKafkaOutqLen, rdKafkaMessageProduceVa, rdKafkaSetLogLevel)
import           Kafka.Internal.Setup     (Kafka (..), KafkaConf (..), KafkaProps (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
import           Kafka.Internal.Shared    (pollEvents)
import           Kafka.Producer.Convert   (copyMsgFlags, handleProduceErrT, producePartitionCInt)
import           Kafka.Producer.Types     (KafkaProducer (..))

import Kafka.Producer.ProducerProperties as X
import Kafka.Producer.Types              as X hiding (KafkaProducer)
import Kafka.Types                       as X

-- | Runs Kafka Producer.
-- The callback provided is expected to call 'produceMessage'
-- to send messages to Kafka.
{-# DEPRECATED runProducer "Use 'newProducer'/'closeProducer' instead" #-}
runProducer :: ProducerProperties
            -> (KafkaProducer -> IO (Either KafkaError a))
            -> IO (Either KafkaError a)
runProducer :: ProducerProperties
-> (KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runProducer props :: ProducerProperties
props f :: KafkaProducer -> IO (Either KafkaError a)
f =
  IO (Either KafkaError KafkaProducer)
-> (Either KafkaError KafkaProducer -> IO ())
-> (Either KafkaError KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either KafkaError KafkaProducer)
mkProducer Either KafkaError KafkaProducer -> IO ()
forall (m :: * -> *) a. MonadIO m => Either a KafkaProducer -> m ()
clProducer Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler
  where
    mkProducer :: IO (Either KafkaError KafkaProducer)
mkProducer = ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer ProducerProperties
props

    clProducer :: Either a KafkaProducer -> m ()
clProducer (Left _)     = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    clProducer (Right prod :: KafkaProducer
prod) = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer KafkaProducer
prod

    runHandler :: Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler (Left err :: KafkaError
err)   = Either KafkaError a -> IO (Either KafkaError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError a -> IO (Either KafkaError a))
-> Either KafkaError a -> IO (Either KafkaError a)
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError a
forall a b. a -> Either a b
Left KafkaError
err
    runHandler (Right prod :: KafkaProducer
prod) = KafkaProducer -> IO (Either KafkaError a)
f KafkaProducer
prod

-- | Creates a new kafka producer
-- A newly created producer must be closed with 'closeProducer' function.
newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer :: ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer pps :: ProducerProperties
pps = IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaProducer)
 -> m (Either KafkaError KafkaProducer))
-> IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ do
  kc :: KafkaConf
kc@(KafkaConf kc' :: RdKafkaConfTPtr
kc' _ _) <- KafkaProps -> IO KafkaConf
kafkaConf (Map Text Text -> KafkaProps
KafkaProps (Map Text Text -> KafkaProps) -> Map Text Text -> KafkaProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppKafkaProps ProducerProperties
pps))
  TopicConf
tc <- TopicProps -> IO TopicConf
topicConf (Map Text Text -> TopicProps
TopicProps (Map Text Text -> TopicProps) -> Map Text Text -> TopicProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppTopicProps ProducerProperties
pps))

  -- add default delivery report callback
  let Callback setDeliveryCallback :: KafkaConf -> IO ()
setDeliveryCallback = (DeliveryReport -> IO ()) -> Callback
deliveryCallback (IO () -> DeliveryReport -> IO ()
forall a b. a -> b -> a
const IO ()
forall a. Monoid a => a
mempty)
  KafkaConf -> IO ()
setDeliveryCallback KafkaConf
kc

  -- set callbacks
  [Callback] -> (Callback -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> [Callback]
ppCallbacks ProducerProperties
pps) (\(Callback setCb :: KafkaConf -> IO ()
setCb) -> KafkaConf -> IO ()
setCb KafkaConf
kc)

  Either Text RdKafkaTPtr
mbKafka <- RdKafkaTypeT -> RdKafkaConfTPtr -> IO (Either Text RdKafkaTPtr)
newRdKafkaT RdKafkaTypeT
RdKafkaProducer RdKafkaConfTPtr
kc'
  case Either Text RdKafkaTPtr
mbKafka of
    Left err :: Text
err    -> Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError KafkaProducer
 -> IO (Either KafkaError KafkaProducer))
-> (KafkaError -> Either KafkaError KafkaProducer)
-> KafkaError
-> IO (Either KafkaError KafkaProducer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> Either KafkaError KafkaProducer
forall a b. a -> Either a b
Left (KafkaError -> IO (Either KafkaError KafkaProducer))
-> KafkaError -> IO (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
err
    Right kafka :: RdKafkaTPtr
kafka -> do
      Maybe KafkaLogLevel -> (KafkaLogLevel -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> Maybe KafkaLogLevel
ppLogLevel ProducerProperties
pps) (RdKafkaTPtr -> Int -> IO ()
rdKafkaSetLogLevel RdKafkaTPtr
kafka (Int -> IO ()) -> (KafkaLogLevel -> Int) -> KafkaLogLevel -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaLogLevel -> Int
forall a. Enum a => a -> Int
fromEnum)
      let prod :: KafkaProducer
prod = Kafka -> KafkaConf -> TopicConf -> KafkaProducer
KafkaProducer (RdKafkaTPtr -> Kafka
Kafka RdKafkaTPtr
kafka) KafkaConf
kc TopicConf
tc
      Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaProducer -> Either KafkaError KafkaProducer
forall a b. b -> Either a b
Right KafkaProducer
prod)

-- | Sends a single message.
-- Since librdkafka is backed by a queue, this function can return before messages are sent. See
-- 'flushProducer' to wait for queue to empty.
produceMessage :: MonadIO m
               => KafkaProducer
               -> ProducerRecord
               -> m (Maybe KafkaError)
produceMessage :: KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
produceMessage kp :: KafkaProducer
kp m :: ProducerRecord
m = KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' KafkaProducer
kp ProducerRecord
m (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()) -> (DeliveryReport -> ()) -> DeliveryReport -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DeliveryReport -> ()
forall a. Monoid a => a
mempty) m (Either ImmediateError ())
-> (Either ImmediateError () -> m (Maybe KafkaError))
-> m (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either ImmediateError () -> m (Maybe KafkaError)
adjustRes
  where
    adjustRes :: Either ImmediateError () -> m (Maybe KafkaError)
adjustRes = \case
      Right () -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe KafkaError
forall a. Maybe a
Nothing
      Left (ImmediateError err :: KafkaError
err) -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just KafkaError
err)

-- | Sends a single message with a registered callback.
--
--   The callback can be a long running process, as it is forked by the thread
--   that handles the delivery reports.
produceMessage' :: MonadIO m
                => KafkaProducer
                -> ProducerRecord
                -> (DeliveryReport -> IO ())
                -> m (Either ImmediateError ())
produceMessage' :: KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' kp :: KafkaProducer
kp@(KafkaProducer (Kafka k :: RdKafkaTPtr
k) _ _) msg :: ProducerRecord
msg cb :: DeliveryReport -> IO ()
cb = IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ImmediateError ()) -> m (Either ImmediateError ()))
-> IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$
  IO ()
fireCallbacks IO ()
-> IO (Either ImmediateError ()) -> IO (Either ImmediateError ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either ImmediateError ())
produceIt
  where
    fireCallbacks :: IO ()
fireCallbacks =
      KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Maybe Timeout -> IO ()) -> (Int -> Maybe Timeout) -> Int -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout)
-> (Int -> Timeout) -> Int -> Maybe Timeout
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Timeout
Timeout (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ 0

    produceIt :: IO (Either ImmediateError ())
produceIt =
      Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prValue ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
 -> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \payloadPtr :: Ptr Word8
payloadPtr payloadLength :: Int
payloadLength ->
        Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prKey ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
 -> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \keyPtr :: Ptr Word8
keyPtr keyLength :: Int
keyLength ->
          Headers
-> ([RdKafkaVuT] -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a. Headers -> ([RdKafkaVuT] -> IO a) -> IO a
withHeaders (ProducerRecord -> Headers
prHeaders ProducerRecord
msg) (([RdKafkaVuT] -> IO (Either ImmediateError ()))
 -> IO (Either ImmediateError ()))
-> ([RdKafkaVuT] -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \hdrs :: [RdKafkaVuT]
hdrs ->
            String
-> (CString -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a. String -> (CString -> IO a) -> IO a
withCString (Text -> String
Text.unpack (Text -> String)
-> (ProducerRecord -> Text) -> ProducerRecord -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TopicName -> Text
unTopicName (TopicName -> Text)
-> (ProducerRecord -> TopicName) -> ProducerRecord -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducerRecord -> TopicName
prTopic (ProducerRecord -> String) -> ProducerRecord -> String
forall a b. (a -> b) -> a -> b
$ ProducerRecord
msg) ((CString -> IO (Either ImmediateError ()))
 -> IO (Either ImmediateError ()))
-> (CString -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \topicName :: CString
topicName -> do
              StablePtr (DeliveryReport -> IO ())
callbackPtr <- (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a. a -> IO (StablePtr a)
newStablePtr DeliveryReport -> IO ()
cb
              let opts :: [RdKafkaVuT]
opts = [
                      CString -> RdKafkaVuT
Topic'RdKafkaVu CString
topicName
                    , CInt32T -> RdKafkaVuT
Partition'RdKafkaVu (CInt32T -> RdKafkaVuT)
-> (ProducerRecord -> CInt32T) -> ProducerRecord -> RdKafkaVuT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducePartition -> CInt32T
producePartitionCInt (ProducePartition -> CInt32T)
-> (ProducerRecord -> ProducePartition)
-> ProducerRecord
-> CInt32T
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducerRecord -> ProducePartition
prPartition (ProducerRecord -> RdKafkaVuT) -> ProducerRecord -> RdKafkaVuT
forall a b. (a -> b) -> a -> b
$ ProducerRecord
msg
                    , CInt32T -> RdKafkaVuT
MsgFlags'RdKafkaVu (Int -> CInt32T
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
copyMsgFlags)
                    , Ptr Word8 -> CSize -> RdKafkaVuT
Value'RdKafkaVu Ptr Word8
payloadPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
payloadLength)
                    , Ptr Word8 -> CSize -> RdKafkaVuT
Key'RdKafkaVu Ptr Word8
keyPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
keyLength)
                    , Ptr () -> RdKafkaVuT
Opaque'RdKafkaVu (StablePtr (DeliveryReport -> IO ()) -> Ptr ()
forall a. StablePtr a -> Ptr ()
castStablePtrToPtr StablePtr (DeliveryReport -> IO ())
callbackPtr)
                    ]

              RdKafkaRespErrT
code <- IO RdKafkaErrorTPtr
-> (RdKafkaErrorTPtr -> IO ())
-> (RdKafkaErrorTPtr -> IO RdKafkaRespErrT)
-> IO RdKafkaRespErrT
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (RdKafkaTPtr -> [RdKafkaVuT] -> IO RdKafkaErrorTPtr
rdKafkaMessageProduceVa RdKafkaTPtr
k ([RdKafkaVuT]
hdrs [RdKafkaVuT] -> [RdKafkaVuT] -> [RdKafkaVuT]
forall a. [a] -> [a] -> [a]
++ [RdKafkaVuT]
opts)) RdKafkaErrorTPtr -> IO ()
rdKafkaErrorDestroy RdKafkaErrorTPtr -> IO RdKafkaRespErrT
rdKafkaErrorCode
              Maybe KafkaError
res  <- RdKafkaRespErrT -> IO (Maybe KafkaError)
handleProduceErrT RdKafkaRespErrT
code
              Either ImmediateError () -> IO (Either ImmediateError ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ImmediateError () -> IO (Either ImmediateError ()))
-> Either ImmediateError () -> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ case Maybe KafkaError
res of
                Just err :: KafkaError
err -> ImmediateError -> Either ImmediateError ()
forall a b. a -> Either a b
Left (ImmediateError -> Either ImmediateError ())
-> (KafkaError -> ImmediateError)
-> KafkaError
-> Either ImmediateError ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> ImmediateError
ImmediateError (KafkaError -> Either ImmediateError ())
-> KafkaError -> Either ImmediateError ()
forall a b. (a -> b) -> a -> b
$ KafkaError
err
                Nothing -> () -> Either ImmediateError ()
forall a b. b -> Either a b
Right ()

-- | Closes the producer.
-- Will wait until the outbound queue is drained before returning the control.
closeProducer :: MonadIO m => KafkaProducer -> m ()
closeProducer :: KafkaProducer -> m ()
closeProducer = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer

-- | Drains the outbound queue for a producer.
--  This function is also called automatically when the producer is closed
-- with 'closeProducer' to ensure that all queued messages make it to Kafka.
flushProducer :: MonadIO m => KafkaProducer -> m ()
flushProducer :: KafkaProducer -> m ()
flushProducer kp :: KafkaProducer
kp = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 100)
    Int
l <- Kafka -> IO Int
outboundQueueLength (KafkaProducer -> Kafka
kpKafkaPtr KafkaProducer
kp)
    if (Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0)
      then KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 0) -- to be sure that all the delivery reports are fired
      else KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer KafkaProducer
kp

------------------------------------------------------------------------------------

withHeaders :: Headers -> ([RdKafkaVuT] -> IO a) -> IO a
withHeaders :: Headers -> ([RdKafkaVuT] -> IO a) -> IO a
withHeaders hds :: Headers
hds = ((ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a)
-> [(ByteString, ByteString)] -> ([RdKafkaVuT] -> IO a) -> IO a
forall a b res.
(a -> (b -> res) -> res) -> [a] -> ([b] -> res) -> res
withMany (ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a
forall a. (ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a
allocHeader (Headers -> [(ByteString, ByteString)]
headersToList Headers
hds)
  where
    allocHeader :: (ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a
allocHeader (nm :: ByteString
nm, val :: ByteString
val) f :: RdKafkaVuT -> IO a
f = 
      ByteString -> (CString -> IO a) -> IO a
forall a. ByteString -> (CString -> IO a) -> IO a
BS.useAsCString ByteString
nm ((CString -> IO a) -> IO a) -> (CString -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \cnm :: CString
cnm ->
          Maybe ByteString -> (Ptr Word8 -> Int -> IO a) -> IO a
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
val) ((Ptr Word8 -> Int -> IO a) -> IO a)
-> (Ptr Word8 -> Int -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \vp :: Ptr Word8
vp vl :: Int
vl ->
            RdKafkaVuT -> IO a
f (RdKafkaVuT -> IO a) -> RdKafkaVuT -> IO a
forall a b. (a -> b) -> a -> b
$ CString -> Ptr Word8 -> CSize -> RdKafkaVuT
Header'RdKafkaVu CString
cnm Ptr Word8
vp (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
vl)

withBS :: Maybe BS.ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS :: Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS Nothing f :: Ptr a -> Int -> IO b
f = Ptr a -> Int -> IO b
f Ptr a
forall a. Ptr a
nullPtr 0
withBS (Just bs :: ByteString
bs) f :: Ptr a -> Int -> IO b
f =
    let (d :: ForeignPtr Word8
d, o :: Int
o, l :: Int
l) = ByteString -> (ForeignPtr Word8, Int, Int)
BSI.toForeignPtr ByteString
bs
    in  ForeignPtr Word8 -> (Ptr Word8 -> IO b) -> IO b
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr Word8
d ((Ptr Word8 -> IO b) -> IO b) -> (Ptr Word8 -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \p :: Ptr Word8
p -> Ptr a -> Int -> IO b
f (Ptr Word8
p Ptr Word8 -> Int -> Ptr a
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
o) Int
l

outboundQueueLength :: Kafka -> IO Int
outboundQueueLength :: Kafka -> IO Int
outboundQueueLength (Kafka k :: RdKafkaTPtr
k) = RdKafkaTPtr -> IO Int
rdKafkaOutqLen RdKafkaTPtr
k