{-# LANGUAGE TupleSections              #-}
{-# LANGUAGE LambdaCase                 #-}

-----------------------------------------------------------------------------
-- |
-- Module to produce messages to Kafka topics.
-- 
-- Here's an example of code to produce messages to a topic:
-- 
-- @
-- import Control.Exception (bracket)
-- import Control.Monad (forM_)
-- import Data.ByteString (ByteString)
-- import Kafka.Producer
-- 
-- -- Global producer properties
-- producerProps :: 'ProducerProperties'
-- producerProps = 'brokersList' ["localhost:9092"]
--              <> 'logLevel' 'KafkaLogDebug'
-- 
-- -- Topic to send messages to
-- targetTopic :: 'TopicName'
-- targetTopic = 'TopicName' "kafka-client-example-topic"
-- 
-- -- Run an example
-- runProducerExample :: IO ()
-- runProducerExample =
--     bracket mkProducer clProducer runHandler >>= print
--     where
--       mkProducer = 'newProducer' producerProps
--       clProducer (Left _)     = pure ()
--       clProducer (Right prod) = 'closeProducer' prod
--       runHandler (Left err)   = pure $ Left err
--       runHandler (Right prod) = sendMessages prod
-- 
-- -- Example sending 2 messages and printing the response from Kafka
-- sendMessages :: 'KafkaProducer' -> IO (Either 'KafkaError' ())
-- sendMessages prod = do
--   err1 <- 'produceMessage' prod (mkMessage Nothing (Just "test from producer") )
--   forM_ err1 print
-- 
--   err2 <- 'produceMessage' prod (mkMessage (Just "key") (Just "test from producer (with key)"))
--   forM_ err2 print
-- 
--   pure $ Right ()
-- 
-- mkMessage :: Maybe ByteString -> Maybe ByteString -> 'ProducerRecord'
-- mkMessage k v = 'ProducerRecord'
--                   { 'prTopic' = targetTopic
--                   , 'prPartition' = 'UnassignedPartition'
--                   , 'prKey' = k
--                   , 'prValue' = v
--                   }
-- @
-----------------------------------------------------------------------------
module Kafka.Producer
( KafkaProducer
, module X
, runProducer
, newProducer
, produceMessage, produceMessageBatch
, produceMessage'
, flushProducer
, closeProducer
, RdKafkaRespErrT (..)
)
where

import           Control.Arrow            ((&&&))
import           Control.Exception        (bracket)
import           Control.Monad            (forM, forM_, (<=<))
import           Control.Monad.IO.Class   (MonadIO (liftIO))
import qualified Data.ByteString          as BS
import qualified Data.ByteString.Internal as BSI
import           Data.Function            (on)
import           Data.List                (groupBy, sortBy)
import           Data.Ord                 (comparing)
import qualified Data.Text                as Text
import           Foreign.ForeignPtr       (newForeignPtr_, withForeignPtr)
import           Foreign.Marshal.Array    (withArrayLen)
import           Foreign.Ptr              (Ptr, nullPtr, plusPtr)
import           Foreign.Storable         (Storable (..))
import           Foreign.StablePtr        (newStablePtr, castStablePtrToPtr)
import           Kafka.Internal.RdKafka   (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel)
import           Kafka.Internal.Setup     (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
import           Kafka.Internal.Shared    (pollEvents)
import           Kafka.Producer.Convert   (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt)
import           Kafka.Producer.Types     (KafkaProducer (..))

import Kafka.Producer.ProducerProperties as X
import Kafka.Producer.Types              as X hiding (KafkaProducer)
import Kafka.Types                       as X

-- | Runs Kafka Producer.
-- The callback provided is expected to call 'produceMessage'
-- or/and 'produceMessageBatch' to send messages to Kafka.
{-# DEPRECATED runProducer "Use 'newProducer'/'closeProducer' instead" #-}
runProducer :: ProducerProperties
            -> (KafkaProducer -> IO (Either KafkaError a))
            -> IO (Either KafkaError a)
runProducer :: ProducerProperties
-> (KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runProducer props :: ProducerProperties
props f :: KafkaProducer -> IO (Either KafkaError a)
f =
  IO (Either KafkaError KafkaProducer)
-> (Either KafkaError KafkaProducer -> IO ())
-> (Either KafkaError KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either KafkaError KafkaProducer)
mkProducer Either KafkaError KafkaProducer -> IO ()
forall (m :: * -> *) a. MonadIO m => Either a KafkaProducer -> m ()
clProducer Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler
  where
    mkProducer :: IO (Either KafkaError KafkaProducer)
mkProducer = ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer ProducerProperties
props

    clProducer :: Either a KafkaProducer -> m ()
clProducer (Left _)     = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    clProducer (Right prod :: KafkaProducer
prod) = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer KafkaProducer
prod

    runHandler :: Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler (Left err :: KafkaError
err)   = Either KafkaError a -> IO (Either KafkaError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError a -> IO (Either KafkaError a))
-> Either KafkaError a -> IO (Either KafkaError a)
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError a
forall a b. a -> Either a b
Left KafkaError
err
    runHandler (Right prod :: KafkaProducer
prod) = KafkaProducer -> IO (Either KafkaError a)
f KafkaProducer
prod

-- | Creates a new kafka producer
-- A newly created producer must be closed with 'closeProducer' function.
newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer :: ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer pps :: ProducerProperties
pps = IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaProducer)
 -> m (Either KafkaError KafkaProducer))
-> IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ do
  kc :: KafkaConf
kc@(KafkaConf kc' :: RdKafkaConfTPtr
kc' _ _) <- KafkaProps -> IO KafkaConf
kafkaConf (Map Text Text -> KafkaProps
KafkaProps (Map Text Text -> KafkaProps) -> Map Text Text -> KafkaProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppKafkaProps ProducerProperties
pps))
  TopicConf
tc <- TopicProps -> IO TopicConf
topicConf (Map Text Text -> TopicProps
TopicProps (Map Text Text -> TopicProps) -> Map Text Text -> TopicProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppTopicProps ProducerProperties
pps))

  -- add default delivery report callback
  let Callback setDeliveryCallback :: KafkaConf -> IO ()
setDeliveryCallback = (DeliveryReport -> IO ()) -> Callback
deliveryCallback (IO () -> DeliveryReport -> IO ()
forall a b. a -> b -> a
const IO ()
forall a. Monoid a => a
mempty)
  KafkaConf -> IO ()
setDeliveryCallback KafkaConf
kc

  -- set callbacks
  [Callback] -> (Callback -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> [Callback]
ppCallbacks ProducerProperties
pps) (\(Callback setCb :: KafkaConf -> IO ()
setCb) -> KafkaConf -> IO ()
setCb KafkaConf
kc)

  Either Text RdKafkaTPtr
mbKafka <- RdKafkaTypeT -> RdKafkaConfTPtr -> IO (Either Text RdKafkaTPtr)
newRdKafkaT RdKafkaTypeT
RdKafkaProducer RdKafkaConfTPtr
kc'
  case Either Text RdKafkaTPtr
mbKafka of
    Left err :: Text
err    -> Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError KafkaProducer
 -> IO (Either KafkaError KafkaProducer))
-> (KafkaError -> Either KafkaError KafkaProducer)
-> KafkaError
-> IO (Either KafkaError KafkaProducer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> Either KafkaError KafkaProducer
forall a b. a -> Either a b
Left (KafkaError -> IO (Either KafkaError KafkaProducer))
-> KafkaError -> IO (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
err
    Right kafka :: RdKafkaTPtr
kafka -> do
      Maybe KafkaLogLevel -> (KafkaLogLevel -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> Maybe KafkaLogLevel
ppLogLevel ProducerProperties
pps) (RdKafkaTPtr -> Int -> IO ()
rdKafkaSetLogLevel RdKafkaTPtr
kafka (Int -> IO ()) -> (KafkaLogLevel -> Int) -> KafkaLogLevel -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaLogLevel -> Int
forall a. Enum a => a -> Int
fromEnum)
      let prod :: KafkaProducer
prod = Kafka -> KafkaConf -> TopicConf -> KafkaProducer
KafkaProducer (RdKafkaTPtr -> Kafka
Kafka RdKafkaTPtr
kafka) KafkaConf
kc TopicConf
tc
      Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaProducer -> Either KafkaError KafkaProducer
forall a b. b -> Either a b
Right KafkaProducer
prod)

-- | Sends a single message.
-- Since librdkafka is backed by a queue, this function can return before messages are sent. See
-- 'flushProducer' to wait for queue to empty.
produceMessage :: MonadIO m
               => KafkaProducer
               -> ProducerRecord
               -> m (Maybe KafkaError)
produceMessage :: KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
produceMessage kp :: KafkaProducer
kp m :: ProducerRecord
m = KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' KafkaProducer
kp ProducerRecord
m (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()) -> (DeliveryReport -> ()) -> DeliveryReport -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DeliveryReport -> ()
forall a. Monoid a => a
mempty) m (Either ImmediateError ())
-> (Either ImmediateError () -> m (Maybe KafkaError))
-> m (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either ImmediateError () -> m (Maybe KafkaError)
adjustRes
  where
    adjustRes :: Either ImmediateError () -> m (Maybe KafkaError)
adjustRes = \case
      Right () -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe KafkaError
forall a. Maybe a
Nothing
      Left (ImmediateError err :: KafkaError
err) -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just KafkaError
err)

-- | Sends a single message with a registered callback.
--
--   The callback can be a long running process, as it is forked by the thread
--   that handles the delivery reports.
--
produceMessage' :: MonadIO m
                => KafkaProducer
                -> ProducerRecord
                -> (DeliveryReport -> IO ())
                -> m (Either ImmediateError ())
produceMessage' :: KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' kp :: KafkaProducer
kp@(KafkaProducer (Kafka k :: RdKafkaTPtr
k) _ (TopicConf tc :: RdKafkaTopicConfTPtr
tc)) msg :: ProducerRecord
msg cb :: DeliveryReport -> IO ()
cb = IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ImmediateError ()) -> m (Either ImmediateError ()))
-> IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$
  IO ()
fireCallbacks IO ()
-> IO (Either ImmediateError ()) -> IO (Either ImmediateError ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either String RdKafkaTopicTPtr)
-> (Either String RdKafkaTopicTPtr -> IO ())
-> (Either String RdKafkaTopicTPtr
    -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName -> IO (Either String RdKafkaTopicTPtr))
-> (ProducerRecord -> TopicName)
-> ProducerRecord
-> IO (Either String RdKafkaTopicTPtr)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducerRecord -> TopicName
prTopic (ProducerRecord -> IO (Either String RdKafkaTopicTPtr))
-> ProducerRecord -> IO (Either String RdKafkaTopicTPtr)
forall a b. (a -> b) -> a -> b
$ ProducerRecord
msg) Either String RdKafkaTopicTPtr -> IO ()
forall a. Either a RdKafkaTopicTPtr -> IO ()
closeTopic Either String RdKafkaTopicTPtr -> IO (Either ImmediateError ())
withTopic
  where
    fireCallbacks :: IO ()
fireCallbacks =
      KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Maybe Timeout -> IO ()) -> (Int -> Maybe Timeout) -> Int -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout)
-> (Int -> Timeout) -> Int -> Maybe Timeout
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Timeout
Timeout (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ 0

    mkTopic :: TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName tn :: Text
tn) =
      RdKafkaTPtr
-> String
-> Maybe RdKafkaTopicConfTPtr
-> IO (Either String RdKafkaTopicTPtr)
newUnmanagedRdKafkaTopicT RdKafkaTPtr
k (Text -> String
Text.unpack Text
tn) (RdKafkaTopicConfTPtr -> Maybe RdKafkaTopicConfTPtr
forall a. a -> Maybe a
Just RdKafkaTopicConfTPtr
tc)

    closeTopic :: Either a RdKafkaTopicTPtr -> IO ()
closeTopic = (a -> IO ())
-> (RdKafkaTopicTPtr -> IO ())
-> Either a RdKafkaTopicTPtr
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> IO ()
forall a. Monoid a => a
mempty RdKafkaTopicTPtr -> IO ()
destroyUnmanagedRdKafkaTopic

    withTopic :: Either String RdKafkaTopicTPtr -> IO (Either ImmediateError ())
withTopic (Left err :: String
err) = Either ImmediateError () -> IO (Either ImmediateError ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ImmediateError () -> IO (Either ImmediateError ()))
-> (String -> Either ImmediateError ())
-> String
-> IO (Either ImmediateError ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ImmediateError -> Either ImmediateError ()
forall a b. a -> Either a b
Left (ImmediateError -> Either ImmediateError ())
-> (String -> ImmediateError) -> String -> Either ImmediateError ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> ImmediateError
ImmediateError (KafkaError -> ImmediateError)
-> (String -> KafkaError) -> String -> ImmediateError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> KafkaError
KafkaError (Text -> KafkaError) -> (String -> Text) -> String -> KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
Text.pack (String -> IO (Either ImmediateError ()))
-> String -> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ String
err
    withTopic (Right topic :: RdKafkaTopicTPtr
topic) =
      Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prValue ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
 -> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \payloadPtr :: Ptr Word8
payloadPtr payloadLength :: Int
payloadLength ->
        Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prKey ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
 -> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \keyPtr :: Ptr Word8
keyPtr keyLength :: Int
keyLength -> do
          StablePtr (DeliveryReport -> IO ())
callbackPtr <- (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a. a -> IO (StablePtr a)
newStablePtr DeliveryReport -> IO ()
cb
          Either KafkaError ()
res <- Int -> IO (Either KafkaError ())
handleProduceErr' (Int -> IO (Either KafkaError ()))
-> IO Int -> IO (Either KafkaError ())
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< RdKafkaTopicTPtr
-> CInt32T
-> Int
-> Ptr Word8
-> CSize
-> Ptr Word8
-> CSize
-> Ptr ()
-> IO Int
rdKafkaProduce
            RdKafkaTopicTPtr
topic
            (ProducePartition -> CInt32T
producePartitionCInt (ProducerRecord -> ProducePartition
prPartition ProducerRecord
msg))
            Int
copyMsgFlags
            Ptr Word8
payloadPtr
            (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
payloadLength)
            Ptr Word8
keyPtr
            (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
keyLength)
            (StablePtr (DeliveryReport -> IO ()) -> Ptr ()
forall a. StablePtr a -> Ptr ()
castStablePtrToPtr StablePtr (DeliveryReport -> IO ())
callbackPtr)

          Either ImmediateError () -> IO (Either ImmediateError ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ImmediateError () -> IO (Either ImmediateError ()))
-> Either ImmediateError () -> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ case Either KafkaError ()
res of
            Left err :: KafkaError
err -> ImmediateError -> Either ImmediateError ()
forall a b. a -> Either a b
Left (ImmediateError -> Either ImmediateError ())
-> (KafkaError -> ImmediateError)
-> KafkaError
-> Either ImmediateError ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> ImmediateError
ImmediateError (KafkaError -> Either ImmediateError ())
-> KafkaError -> Either ImmediateError ()
forall a b. (a -> b) -> a -> b
$ KafkaError
err
            Right () -> () -> Either ImmediateError ()
forall a b. b -> Either a b
Right ()

-- | Sends a batch of messages.
-- Returns a list of messages which it was unable to send with corresponding errors.
-- Since librdkafka is backed by a queue, this function can return before messages are sent. See
-- 'flushProducer' to wait for queue to empty.
produceMessageBatch :: MonadIO m
                    => KafkaProducer
                    -> [ProducerRecord]
                    -> m [(ProducerRecord, KafkaError)]
                    -- ^ An empty list when the operation is successful,
                    -- otherwise a list of "failed" messages with corresponsing errors.
produceMessageBatch :: KafkaProducer
-> [ProducerRecord] -> m [(ProducerRecord, KafkaError)]
produceMessageBatch kp :: KafkaProducer
kp@(KafkaProducer (Kafka k :: RdKafkaTPtr
k) _ (TopicConf tc :: RdKafkaTopicConfTPtr
tc)) messages :: [ProducerRecord]
messages = IO [(ProducerRecord, KafkaError)]
-> m [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [(ProducerRecord, KafkaError)]
 -> m [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
-> m [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ do
  KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 0) -- fire callbacks if any exist (handle delivery reports)
  [[(ProducerRecord, KafkaError)]] -> [(ProducerRecord, KafkaError)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[(ProducerRecord, KafkaError)]]
 -> [(ProducerRecord, KafkaError)])
-> IO [[(ProducerRecord, KafkaError)]]
-> IO [(ProducerRecord, KafkaError)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [[ProducerRecord]]
-> ([ProducerRecord] -> IO [(ProducerRecord, KafkaError)])
-> IO [[(ProducerRecord, KafkaError)]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([ProducerRecord] -> [[ProducerRecord]]
mkBatches [ProducerRecord]
messages) [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
sendBatch
  where
    mkSortKey :: ProducerRecord -> (TopicName, ProducePartition)
mkSortKey = ProducerRecord -> TopicName
prTopic (ProducerRecord -> TopicName)
-> (ProducerRecord -> ProducePartition)
-> ProducerRecord
-> (TopicName, ProducePartition)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& ProducerRecord -> ProducePartition
prPartition
    mkBatches :: [ProducerRecord] -> [[ProducerRecord]]
mkBatches = (ProducerRecord -> ProducerRecord -> Bool)
-> [ProducerRecord] -> [[ProducerRecord]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy ((TopicName, ProducePartition)
-> (TopicName, ProducePartition) -> Bool
forall a. Eq a => a -> a -> Bool
(==) ((TopicName, ProducePartition)
 -> (TopicName, ProducePartition) -> Bool)
-> (ProducerRecord -> (TopicName, ProducePartition))
-> ProducerRecord
-> ProducerRecord
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` ProducerRecord -> (TopicName, ProducePartition)
mkSortKey) ([ProducerRecord] -> [[ProducerRecord]])
-> ([ProducerRecord] -> [ProducerRecord])
-> [ProducerRecord]
-> [[ProducerRecord]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ProducerRecord -> ProducerRecord -> Ordering)
-> [ProducerRecord] -> [ProducerRecord]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy ((ProducerRecord -> (TopicName, ProducePartition))
-> ProducerRecord -> ProducerRecord -> Ordering
forall a b. Ord a => (b -> a) -> b -> b -> Ordering
comparing ProducerRecord -> (TopicName, ProducePartition)
mkSortKey)

    mkTopic :: TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName tn :: Text
tn) = RdKafkaTPtr
-> String
-> Maybe RdKafkaTopicConfTPtr
-> IO (Either String RdKafkaTopicTPtr)
newUnmanagedRdKafkaTopicT RdKafkaTPtr
k (Text -> String
Text.unpack Text
tn) (RdKafkaTopicConfTPtr -> Maybe RdKafkaTopicConfTPtr
forall a. a -> Maybe a
Just RdKafkaTopicConfTPtr
tc)

    clTopic :: Either b RdKafkaTopicTPtr -> IO ()
clTopic = (b -> IO ())
-> (RdKafkaTopicTPtr -> IO ())
-> Either b RdKafkaTopicTPtr
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> (b -> ()) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. () -> b -> ()
forall a b. a -> b -> a
const ()) RdKafkaTopicTPtr -> IO ()
destroyUnmanagedRdKafkaTopic

    sendBatch :: [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
sendBatch []    = [(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return []
    sendBatch batch :: [ProducerRecord]
batch = IO (Either String RdKafkaTopicTPtr)
-> (Either String RdKafkaTopicTPtr -> IO ())
-> (Either String RdKafkaTopicTPtr
    -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName -> IO (Either String RdKafkaTopicTPtr))
-> TopicName -> IO (Either String RdKafkaTopicTPtr)
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> TopicName
prTopic ([ProducerRecord] -> ProducerRecord
forall a. [a] -> a
head [ProducerRecord]
batch)) Either String RdKafkaTopicTPtr -> IO ()
forall a. Either a RdKafkaTopicTPtr -> IO ()
clTopic ([ProducerRecord]
-> Either String RdKafkaTopicTPtr
-> IO [(ProducerRecord, KafkaError)]
withTopic [ProducerRecord]
batch)

    withTopic :: [ProducerRecord]
-> Either String RdKafkaTopicTPtr
-> IO [(ProducerRecord, KafkaError)]
withTopic ms :: [ProducerRecord]
ms (Left err :: String
err) = [(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(ProducerRecord, KafkaError)]
 -> IO [(ProducerRecord, KafkaError)])
-> [(ProducerRecord, KafkaError)]
-> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ (, Text -> KafkaError
KafkaError (String -> Text
Text.pack String
err)) (ProducerRecord -> (ProducerRecord, KafkaError))
-> [ProducerRecord] -> [(ProducerRecord, KafkaError)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ProducerRecord]
ms
    withTopic ms :: [ProducerRecord]
ms (Right t :: RdKafkaTopicTPtr
t) = do
      let (partInt :: Int
partInt, partCInt :: CInt32T
partCInt) = (ProducePartition -> Int
producePartitionInt (ProducePartition -> Int)
-> (ProducePartition -> CInt32T)
-> ProducePartition
-> (Int, CInt32T)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& ProducePartition -> CInt32T
producePartitionCInt) (ProducePartition -> (Int, CInt32T))
-> ProducePartition -> (Int, CInt32T)
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> ProducePartition
prPartition ([ProducerRecord] -> ProducerRecord
forall a. [a] -> a
head [ProducerRecord]
ms)
      RdKafkaTopicTPtr
-> (Ptr RdKafkaTopicT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaTopicTPtr
t ((Ptr RdKafkaTopicT -> IO [(ProducerRecord, KafkaError)])
 -> IO [(ProducerRecord, KafkaError)])
-> (Ptr RdKafkaTopicT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ \topicPtr :: Ptr RdKafkaTopicT
topicPtr -> do
        [RdKafkaMessageT]
nativeMs <- [ProducerRecord]
-> (ProducerRecord -> IO RdKafkaMessageT) -> IO [RdKafkaMessageT]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ProducerRecord]
ms (Ptr RdKafkaTopicT -> Int -> ProducerRecord -> IO RdKafkaMessageT
toNativeMessage Ptr RdKafkaTopicT
topicPtr Int
partInt)
        [RdKafkaMessageT]
-> (Int
    -> Ptr RdKafkaMessageT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. Storable a => [a] -> (Int -> Ptr a -> IO b) -> IO b
withArrayLen [RdKafkaMessageT]
nativeMs ((Int -> Ptr RdKafkaMessageT -> IO [(ProducerRecord, KafkaError)])
 -> IO [(ProducerRecord, KafkaError)])
-> (Int
    -> Ptr RdKafkaMessageT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ \len :: Int
len batchPtr :: Ptr RdKafkaMessageT
batchPtr -> do
          ForeignPtr RdKafkaMessageT
batchPtrF <- Ptr RdKafkaMessageT -> IO (ForeignPtr RdKafkaMessageT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaMessageT
batchPtr
          Int
numRet    <- RdKafkaTopicTPtr
-> CInt32T -> Int -> ForeignPtr RdKafkaMessageT -> Int -> IO Int
rdKafkaProduceBatch RdKafkaTopicTPtr
t CInt32T
partCInt Int
copyMsgFlags ForeignPtr RdKafkaMessageT
batchPtrF Int
len
          if Int
numRet Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
len then [(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return []
          else do
            [RdKafkaRespErrT]
errs <- (Int -> IO RdKafkaRespErrT) -> [Int] -> IO [RdKafkaRespErrT]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (RdKafkaRespErrT -> IO RdKafkaRespErrT
forall (m :: * -> *) a. Monad m => a -> m a
return (RdKafkaRespErrT -> IO RdKafkaRespErrT)
-> (RdKafkaMessageT -> RdKafkaRespErrT)
-> RdKafkaMessageT
-> IO RdKafkaRespErrT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT (RdKafkaMessageT -> IO RdKafkaRespErrT)
-> (Int -> IO RdKafkaMessageT) -> Int -> IO RdKafkaRespErrT
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< Ptr RdKafkaMessageT -> Int -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr RdKafkaMessageT
batchPtr)
                         [0..(Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1)]
            [(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(ProducerRecord
m, RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
e) | (m :: ProducerRecord
m, e :: RdKafkaRespErrT
e) <- [ProducerRecord]
-> [RdKafkaRespErrT] -> [(ProducerRecord, RdKafkaRespErrT)]
forall a b. [a] -> [b] -> [(a, b)]
zip [ProducerRecord]
messages [RdKafkaRespErrT]
errs, RdKafkaRespErrT
e RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError]

    toNativeMessage :: Ptr RdKafkaTopicT -> Int -> ProducerRecord -> IO RdKafkaMessageT
toNativeMessage t :: Ptr RdKafkaTopicT
t p :: Int
p m :: ProducerRecord
m =
      Maybe ByteString
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prValue ProducerRecord
m) ((Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT)
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. (a -> b) -> a -> b
$ \payloadPtr :: Ptr Word8
payloadPtr payloadLength :: Int
payloadLength ->
        Maybe ByteString
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prKey ProducerRecord
m) ((Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT)
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. (a -> b) -> a -> b
$ \keyPtr :: Ptr Word8
keyPtr keyLength :: Int
keyLength ->
          RdKafkaMessageT -> IO RdKafkaMessageT
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaMessageT :: RdKafkaRespErrT
-> Ptr RdKafkaTopicT
-> Int
-> Int
-> Int
-> Int64
-> Ptr Word8
-> Ptr Word8
-> Ptr ()
-> RdKafkaMessageT
RdKafkaMessageT
            { err'RdKafkaMessageT :: RdKafkaRespErrT
err'RdKafkaMessageT       = RdKafkaRespErrT
RdKafkaRespErrNoError
            , topic'RdKafkaMessageT :: Ptr RdKafkaTopicT
topic'RdKafkaMessageT     = Ptr RdKafkaTopicT
t
            , partition'RdKafkaMessageT :: Int
partition'RdKafkaMessageT = Int
p
            , len'RdKafkaMessageT :: Int
len'RdKafkaMessageT       = Int
payloadLength
            , payload'RdKafkaMessageT :: Ptr Word8
payload'RdKafkaMessageT   = Ptr Word8
payloadPtr
            , offset'RdKafkaMessageT :: Int64
offset'RdKafkaMessageT    = 0
            , keyLen'RdKafkaMessageT :: Int
keyLen'RdKafkaMessageT    = Int
keyLength
            , key'RdKafkaMessageT :: Ptr Word8
key'RdKafkaMessageT       = Ptr Word8
keyPtr
            , opaque'RdKafkaMessageT :: Ptr ()
opaque'RdKafkaMessageT    = Ptr ()
forall a. Ptr a
nullPtr
            }

-- | Closes the producer.
-- Will wait until the outbound queue is drained before returning the control.
closeProducer :: MonadIO m => KafkaProducer -> m ()
closeProducer :: KafkaProducer -> m ()
closeProducer = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer

-- | Drains the outbound queue for a producer.
--  This function is also called automatically when the producer is closed
-- with 'closeProducer' to ensure that all queued messages make it to Kafka.
flushProducer :: MonadIO m => KafkaProducer -> m ()
flushProducer :: KafkaProducer -> m ()
flushProducer kp :: KafkaProducer
kp = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 100)
    Int
l <- Kafka -> IO Int
outboundQueueLength (KafkaProducer -> Kafka
kpKafkaPtr KafkaProducer
kp)
    if (Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0)
      then KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 0) -- to be sure that all the delivery reports are fired
      else KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer KafkaProducer
kp

------------------------------------------------------------------------------------

withBS :: Maybe BS.ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS :: Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS Nothing f :: Ptr a -> Int -> IO b
f = Ptr a -> Int -> IO b
f Ptr a
forall a. Ptr a
nullPtr 0
withBS (Just bs :: ByteString
bs) f :: Ptr a -> Int -> IO b
f =
    let (d :: ForeignPtr Word8
d, o :: Int
o, l :: Int
l) = ByteString -> (ForeignPtr Word8, Int, Int)
BSI.toForeignPtr ByteString
bs
    in  ForeignPtr Word8 -> (Ptr Word8 -> IO b) -> IO b
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr Word8
d ((Ptr Word8 -> IO b) -> IO b) -> (Ptr Word8 -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \p :: Ptr Word8
p -> Ptr a -> Int -> IO b
f (Ptr Word8
p Ptr Word8 -> Int -> Ptr a
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
o) Int
l

outboundQueueLength :: Kafka -> IO Int
outboundQueueLength :: Kafka -> IO Int
outboundQueueLength (Kafka k :: RdKafkaTPtr
k) = RdKafkaTPtr -> IO Int
rdKafkaOutqLen RdKafkaTPtr
k