{-# LANGUAGE TupleSections #-}
{-# LANGUAGE LambdaCase #-}
module Kafka.Producer
( KafkaProducer
, module X
, runProducer
, newProducer
, produceMessage, produceMessageBatch
, produceMessage'
, flushProducer
, closeProducer
, RdKafkaRespErrT (..)
)
where
import Control.Arrow ((&&&))
import Control.Exception (bracket)
import Control.Monad (forM, forM_, (<=<))
import Control.Monad.IO.Class (MonadIO (liftIO))
import qualified Data.ByteString as BS
import qualified Data.ByteString.Internal as BSI
import Data.Function (on)
import Data.List (groupBy, sortBy)
import Data.Ord (comparing)
import qualified Data.Text as Text
import Foreign.ForeignPtr (newForeignPtr_, withForeignPtr)
import Foreign.Marshal.Array (withArrayLen)
import Foreign.Ptr (Ptr, nullPtr, plusPtr)
import Foreign.Storable (Storable (..))
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel)
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
import Kafka.Internal.Shared (pollEvents)
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt)
import Kafka.Producer.Types (KafkaProducer (..))
import Kafka.Producer.ProducerProperties as X
import Kafka.Producer.Types as X hiding (KafkaProducer)
import Kafka.Types as X
{-# DEPRECATED runProducer "Use 'newProducer'/'closeProducer' instead" #-}
runProducer :: ProducerProperties
-> (KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runProducer :: ProducerProperties
-> (KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runProducer props :: ProducerProperties
props f :: KafkaProducer -> IO (Either KafkaError a)
f =
IO (Either KafkaError KafkaProducer)
-> (Either KafkaError KafkaProducer -> IO ())
-> (Either KafkaError KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either KafkaError KafkaProducer)
mkProducer Either KafkaError KafkaProducer -> IO ()
forall (m :: * -> *) a. MonadIO m => Either a KafkaProducer -> m ()
clProducer Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler
where
mkProducer :: IO (Either KafkaError KafkaProducer)
mkProducer = ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer ProducerProperties
props
clProducer :: Either a KafkaProducer -> m ()
clProducer (Left _) = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
clProducer (Right prod :: KafkaProducer
prod) = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer KafkaProducer
prod
runHandler :: Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler (Left err :: KafkaError
err) = Either KafkaError a -> IO (Either KafkaError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError a -> IO (Either KafkaError a))
-> Either KafkaError a -> IO (Either KafkaError a)
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError a
forall a b. a -> Either a b
Left KafkaError
err
runHandler (Right prod :: KafkaProducer
prod) = KafkaProducer -> IO (Either KafkaError a)
f KafkaProducer
prod
newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer :: ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer pps :: ProducerProperties
pps = IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer))
-> IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ do
kc :: KafkaConf
kc@(KafkaConf kc' :: RdKafkaConfTPtr
kc' _ _) <- KafkaProps -> IO KafkaConf
kafkaConf (Map Text Text -> KafkaProps
KafkaProps (Map Text Text -> KafkaProps) -> Map Text Text -> KafkaProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppKafkaProps ProducerProperties
pps))
TopicConf
tc <- TopicProps -> IO TopicConf
topicConf (Map Text Text -> TopicProps
TopicProps (Map Text Text -> TopicProps) -> Map Text Text -> TopicProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppTopicProps ProducerProperties
pps))
let Callback setDeliveryCallback :: KafkaConf -> IO ()
setDeliveryCallback = (DeliveryReport -> IO ()) -> Callback
deliveryCallback (IO () -> DeliveryReport -> IO ()
forall a b. a -> b -> a
const IO ()
forall a. Monoid a => a
mempty)
KafkaConf -> IO ()
setDeliveryCallback KafkaConf
kc
[Callback] -> (Callback -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> [Callback]
ppCallbacks ProducerProperties
pps) (\(Callback setCb :: KafkaConf -> IO ()
setCb) -> KafkaConf -> IO ()
setCb KafkaConf
kc)
Either Text RdKafkaTPtr
mbKafka <- RdKafkaTypeT -> RdKafkaConfTPtr -> IO (Either Text RdKafkaTPtr)
newRdKafkaT RdKafkaTypeT
RdKafkaProducer RdKafkaConfTPtr
kc'
case Either Text RdKafkaTPtr
mbKafka of
Left err :: Text
err -> Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer))
-> (KafkaError -> Either KafkaError KafkaProducer)
-> KafkaError
-> IO (Either KafkaError KafkaProducer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> Either KafkaError KafkaProducer
forall a b. a -> Either a b
Left (KafkaError -> IO (Either KafkaError KafkaProducer))
-> KafkaError -> IO (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
err
Right kafka :: RdKafkaTPtr
kafka -> do
Maybe KafkaLogLevel -> (KafkaLogLevel -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> Maybe KafkaLogLevel
ppLogLevel ProducerProperties
pps) (RdKafkaTPtr -> Int -> IO ()
rdKafkaSetLogLevel RdKafkaTPtr
kafka (Int -> IO ()) -> (KafkaLogLevel -> Int) -> KafkaLogLevel -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaLogLevel -> Int
forall a. Enum a => a -> Int
fromEnum)
let prod :: KafkaProducer
prod = Kafka -> KafkaConf -> TopicConf -> KafkaProducer
KafkaProducer (RdKafkaTPtr -> Kafka
Kafka RdKafkaTPtr
kafka) KafkaConf
kc TopicConf
tc
Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaProducer -> Either KafkaError KafkaProducer
forall a b. b -> Either a b
Right KafkaProducer
prod)
produceMessage :: MonadIO m
=> KafkaProducer
-> ProducerRecord
-> m (Maybe KafkaError)
produceMessage :: KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
produceMessage kp :: KafkaProducer
kp m :: ProducerRecord
m = KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' KafkaProducer
kp ProducerRecord
m (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()) -> (DeliveryReport -> ()) -> DeliveryReport -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DeliveryReport -> ()
forall a. Monoid a => a
mempty) m (Either ImmediateError ())
-> (Either ImmediateError () -> m (Maybe KafkaError))
-> m (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either ImmediateError () -> m (Maybe KafkaError)
adjustRes
where
adjustRes :: Either ImmediateError () -> m (Maybe KafkaError)
adjustRes = \case
Right () -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe KafkaError
forall a. Maybe a
Nothing
Left (ImmediateError err :: KafkaError
err) -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just KafkaError
err)
produceMessage' :: MonadIO m
=> KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' :: KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' kp :: KafkaProducer
kp@(KafkaProducer (Kafka k :: RdKafkaTPtr
k) _ (TopicConf tc :: RdKafkaTopicConfTPtr
tc)) msg :: ProducerRecord
msg cb :: DeliveryReport -> IO ()
cb = IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ImmediateError ()) -> m (Either ImmediateError ()))
-> IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$
IO ()
fireCallbacks IO ()
-> IO (Either ImmediateError ()) -> IO (Either ImmediateError ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either String RdKafkaTopicTPtr)
-> (Either String RdKafkaTopicTPtr -> IO ())
-> (Either String RdKafkaTopicTPtr
-> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName -> IO (Either String RdKafkaTopicTPtr))
-> (ProducerRecord -> TopicName)
-> ProducerRecord
-> IO (Either String RdKafkaTopicTPtr)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducerRecord -> TopicName
prTopic (ProducerRecord -> IO (Either String RdKafkaTopicTPtr))
-> ProducerRecord -> IO (Either String RdKafkaTopicTPtr)
forall a b. (a -> b) -> a -> b
$ ProducerRecord
msg) Either String RdKafkaTopicTPtr -> IO ()
forall a. Either a RdKafkaTopicTPtr -> IO ()
closeTopic Either String RdKafkaTopicTPtr -> IO (Either ImmediateError ())
withTopic
where
fireCallbacks :: IO ()
fireCallbacks =
KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Maybe Timeout -> IO ()) -> (Int -> Maybe Timeout) -> Int -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout)
-> (Int -> Timeout) -> Int -> Maybe Timeout
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Timeout
Timeout (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ 0
mkTopic :: TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName tn :: Text
tn) =
RdKafkaTPtr
-> String
-> Maybe RdKafkaTopicConfTPtr
-> IO (Either String RdKafkaTopicTPtr)
newUnmanagedRdKafkaTopicT RdKafkaTPtr
k (Text -> String
Text.unpack Text
tn) (RdKafkaTopicConfTPtr -> Maybe RdKafkaTopicConfTPtr
forall a. a -> Maybe a
Just RdKafkaTopicConfTPtr
tc)
closeTopic :: Either a RdKafkaTopicTPtr -> IO ()
closeTopic = (a -> IO ())
-> (RdKafkaTopicTPtr -> IO ())
-> Either a RdKafkaTopicTPtr
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> IO ()
forall a. Monoid a => a
mempty RdKafkaTopicTPtr -> IO ()
destroyUnmanagedRdKafkaTopic
withTopic :: Either String RdKafkaTopicTPtr -> IO (Either ImmediateError ())
withTopic (Left err :: String
err) = Either ImmediateError () -> IO (Either ImmediateError ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ImmediateError () -> IO (Either ImmediateError ()))
-> (String -> Either ImmediateError ())
-> String
-> IO (Either ImmediateError ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ImmediateError -> Either ImmediateError ()
forall a b. a -> Either a b
Left (ImmediateError -> Either ImmediateError ())
-> (String -> ImmediateError) -> String -> Either ImmediateError ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> ImmediateError
ImmediateError (KafkaError -> ImmediateError)
-> (String -> KafkaError) -> String -> ImmediateError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> KafkaError
KafkaError (Text -> KafkaError) -> (String -> Text) -> String -> KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
Text.pack (String -> IO (Either ImmediateError ()))
-> String -> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ String
err
withTopic (Right topic :: RdKafkaTopicTPtr
topic) =
Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prValue ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \payloadPtr :: Ptr Word8
payloadPtr payloadLength :: Int
payloadLength ->
Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prKey ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \keyPtr :: Ptr Word8
keyPtr keyLength :: Int
keyLength -> do
StablePtr (DeliveryReport -> IO ())
callbackPtr <- (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a. a -> IO (StablePtr a)
newStablePtr DeliveryReport -> IO ()
cb
Either KafkaError ()
res <- Int -> IO (Either KafkaError ())
handleProduceErr' (Int -> IO (Either KafkaError ()))
-> IO Int -> IO (Either KafkaError ())
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< RdKafkaTopicTPtr
-> CInt32T
-> Int
-> Ptr Word8
-> CSize
-> Ptr Word8
-> CSize
-> Ptr ()
-> IO Int
rdKafkaProduce
RdKafkaTopicTPtr
topic
(ProducePartition -> CInt32T
producePartitionCInt (ProducerRecord -> ProducePartition
prPartition ProducerRecord
msg))
Int
copyMsgFlags
Ptr Word8
payloadPtr
(Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
payloadLength)
Ptr Word8
keyPtr
(Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
keyLength)
(StablePtr (DeliveryReport -> IO ()) -> Ptr ()
forall a. StablePtr a -> Ptr ()
castStablePtrToPtr StablePtr (DeliveryReport -> IO ())
callbackPtr)
Either ImmediateError () -> IO (Either ImmediateError ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ImmediateError () -> IO (Either ImmediateError ()))
-> Either ImmediateError () -> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ case Either KafkaError ()
res of
Left err :: KafkaError
err -> ImmediateError -> Either ImmediateError ()
forall a b. a -> Either a b
Left (ImmediateError -> Either ImmediateError ())
-> (KafkaError -> ImmediateError)
-> KafkaError
-> Either ImmediateError ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> ImmediateError
ImmediateError (KafkaError -> Either ImmediateError ())
-> KafkaError -> Either ImmediateError ()
forall a b. (a -> b) -> a -> b
$ KafkaError
err
Right () -> () -> Either ImmediateError ()
forall a b. b -> Either a b
Right ()
produceMessageBatch :: MonadIO m
=> KafkaProducer
-> [ProducerRecord]
-> m [(ProducerRecord, KafkaError)]
produceMessageBatch :: KafkaProducer
-> [ProducerRecord] -> m [(ProducerRecord, KafkaError)]
produceMessageBatch kp :: KafkaProducer
kp@(KafkaProducer (Kafka k :: RdKafkaTPtr
k) _ (TopicConf tc :: RdKafkaTopicConfTPtr
tc)) messages :: [ProducerRecord]
messages = IO [(ProducerRecord, KafkaError)]
-> m [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [(ProducerRecord, KafkaError)]
-> m [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
-> m [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ do
KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 0)
[[(ProducerRecord, KafkaError)]] -> [(ProducerRecord, KafkaError)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[(ProducerRecord, KafkaError)]]
-> [(ProducerRecord, KafkaError)])
-> IO [[(ProducerRecord, KafkaError)]]
-> IO [(ProducerRecord, KafkaError)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [[ProducerRecord]]
-> ([ProducerRecord] -> IO [(ProducerRecord, KafkaError)])
-> IO [[(ProducerRecord, KafkaError)]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([ProducerRecord] -> [[ProducerRecord]]
mkBatches [ProducerRecord]
messages) [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
sendBatch
where
mkSortKey :: ProducerRecord -> (TopicName, ProducePartition)
mkSortKey = ProducerRecord -> TopicName
prTopic (ProducerRecord -> TopicName)
-> (ProducerRecord -> ProducePartition)
-> ProducerRecord
-> (TopicName, ProducePartition)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& ProducerRecord -> ProducePartition
prPartition
mkBatches :: [ProducerRecord] -> [[ProducerRecord]]
mkBatches = (ProducerRecord -> ProducerRecord -> Bool)
-> [ProducerRecord] -> [[ProducerRecord]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy ((TopicName, ProducePartition)
-> (TopicName, ProducePartition) -> Bool
forall a. Eq a => a -> a -> Bool
(==) ((TopicName, ProducePartition)
-> (TopicName, ProducePartition) -> Bool)
-> (ProducerRecord -> (TopicName, ProducePartition))
-> ProducerRecord
-> ProducerRecord
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` ProducerRecord -> (TopicName, ProducePartition)
mkSortKey) ([ProducerRecord] -> [[ProducerRecord]])
-> ([ProducerRecord] -> [ProducerRecord])
-> [ProducerRecord]
-> [[ProducerRecord]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ProducerRecord -> ProducerRecord -> Ordering)
-> [ProducerRecord] -> [ProducerRecord]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy ((ProducerRecord -> (TopicName, ProducePartition))
-> ProducerRecord -> ProducerRecord -> Ordering
forall a b. Ord a => (b -> a) -> b -> b -> Ordering
comparing ProducerRecord -> (TopicName, ProducePartition)
mkSortKey)
mkTopic :: TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName tn :: Text
tn) = RdKafkaTPtr
-> String
-> Maybe RdKafkaTopicConfTPtr
-> IO (Either String RdKafkaTopicTPtr)
newUnmanagedRdKafkaTopicT RdKafkaTPtr
k (Text -> String
Text.unpack Text
tn) (RdKafkaTopicConfTPtr -> Maybe RdKafkaTopicConfTPtr
forall a. a -> Maybe a
Just RdKafkaTopicConfTPtr
tc)
clTopic :: Either b RdKafkaTopicTPtr -> IO ()
clTopic = (b -> IO ())
-> (RdKafkaTopicTPtr -> IO ())
-> Either b RdKafkaTopicTPtr
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> (b -> ()) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. () -> b -> ()
forall a b. a -> b -> a
const ()) RdKafkaTopicTPtr -> IO ()
destroyUnmanagedRdKafkaTopic
sendBatch :: [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
sendBatch [] = [(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return []
sendBatch batch :: [ProducerRecord]
batch = IO (Either String RdKafkaTopicTPtr)
-> (Either String RdKafkaTopicTPtr -> IO ())
-> (Either String RdKafkaTopicTPtr
-> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (TopicName -> IO (Either String RdKafkaTopicTPtr)
mkTopic (TopicName -> IO (Either String RdKafkaTopicTPtr))
-> TopicName -> IO (Either String RdKafkaTopicTPtr)
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> TopicName
prTopic ([ProducerRecord] -> ProducerRecord
forall a. [a] -> a
head [ProducerRecord]
batch)) Either String RdKafkaTopicTPtr -> IO ()
forall a. Either a RdKafkaTopicTPtr -> IO ()
clTopic ([ProducerRecord]
-> Either String RdKafkaTopicTPtr
-> IO [(ProducerRecord, KafkaError)]
withTopic [ProducerRecord]
batch)
withTopic :: [ProducerRecord]
-> Either String RdKafkaTopicTPtr
-> IO [(ProducerRecord, KafkaError)]
withTopic ms :: [ProducerRecord]
ms (Left err :: String
err) = [(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(ProducerRecord, KafkaError)]
-> IO [(ProducerRecord, KafkaError)])
-> [(ProducerRecord, KafkaError)]
-> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ (, Text -> KafkaError
KafkaError (String -> Text
Text.pack String
err)) (ProducerRecord -> (ProducerRecord, KafkaError))
-> [ProducerRecord] -> [(ProducerRecord, KafkaError)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ProducerRecord]
ms
withTopic ms :: [ProducerRecord]
ms (Right t :: RdKafkaTopicTPtr
t) = do
let (partInt :: Int
partInt, partCInt :: CInt32T
partCInt) = (ProducePartition -> Int
producePartitionInt (ProducePartition -> Int)
-> (ProducePartition -> CInt32T)
-> ProducePartition
-> (Int, CInt32T)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& ProducePartition -> CInt32T
producePartitionCInt) (ProducePartition -> (Int, CInt32T))
-> ProducePartition -> (Int, CInt32T)
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> ProducePartition
prPartition ([ProducerRecord] -> ProducerRecord
forall a. [a] -> a
head [ProducerRecord]
ms)
RdKafkaTopicTPtr
-> (Ptr RdKafkaTopicT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaTopicTPtr
t ((Ptr RdKafkaTopicT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)])
-> (Ptr RdKafkaTopicT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ \topicPtr :: Ptr RdKafkaTopicT
topicPtr -> do
[RdKafkaMessageT]
nativeMs <- [ProducerRecord]
-> (ProducerRecord -> IO RdKafkaMessageT) -> IO [RdKafkaMessageT]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ProducerRecord]
ms (Ptr RdKafkaTopicT -> Int -> ProducerRecord -> IO RdKafkaMessageT
toNativeMessage Ptr RdKafkaTopicT
topicPtr Int
partInt)
[RdKafkaMessageT]
-> (Int
-> Ptr RdKafkaMessageT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. Storable a => [a] -> (Int -> Ptr a -> IO b) -> IO b
withArrayLen [RdKafkaMessageT]
nativeMs ((Int -> Ptr RdKafkaMessageT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)])
-> (Int
-> Ptr RdKafkaMessageT -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ \len :: Int
len batchPtr :: Ptr RdKafkaMessageT
batchPtr -> do
ForeignPtr RdKafkaMessageT
batchPtrF <- Ptr RdKafkaMessageT -> IO (ForeignPtr RdKafkaMessageT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaMessageT
batchPtr
Int
numRet <- RdKafkaTopicTPtr
-> CInt32T -> Int -> ForeignPtr RdKafkaMessageT -> Int -> IO Int
rdKafkaProduceBatch RdKafkaTopicTPtr
t CInt32T
partCInt Int
copyMsgFlags ForeignPtr RdKafkaMessageT
batchPtrF Int
len
if Int
numRet Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
len then [(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return []
else do
[RdKafkaRespErrT]
errs <- (Int -> IO RdKafkaRespErrT) -> [Int] -> IO [RdKafkaRespErrT]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (RdKafkaRespErrT -> IO RdKafkaRespErrT
forall (m :: * -> *) a. Monad m => a -> m a
return (RdKafkaRespErrT -> IO RdKafkaRespErrT)
-> (RdKafkaMessageT -> RdKafkaRespErrT)
-> RdKafkaMessageT
-> IO RdKafkaRespErrT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT (RdKafkaMessageT -> IO RdKafkaRespErrT)
-> (Int -> IO RdKafkaMessageT) -> Int -> IO RdKafkaRespErrT
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< Ptr RdKafkaMessageT -> Int -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr RdKafkaMessageT
batchPtr)
[0..(Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1)]
[(ProducerRecord, KafkaError)] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(ProducerRecord
m, RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
e) | (m :: ProducerRecord
m, e :: RdKafkaRespErrT
e) <- [ProducerRecord]
-> [RdKafkaRespErrT] -> [(ProducerRecord, RdKafkaRespErrT)]
forall a b. [a] -> [b] -> [(a, b)]
zip [ProducerRecord]
messages [RdKafkaRespErrT]
errs, RdKafkaRespErrT
e RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError]
toNativeMessage :: Ptr RdKafkaTopicT -> Int -> ProducerRecord -> IO RdKafkaMessageT
toNativeMessage t :: Ptr RdKafkaTopicT
t p :: Int
p m :: ProducerRecord
m =
Maybe ByteString
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prValue ProducerRecord
m) ((Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT)
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. (a -> b) -> a -> b
$ \payloadPtr :: Ptr Word8
payloadPtr payloadLength :: Int
payloadLength ->
Maybe ByteString
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prKey ProducerRecord
m) ((Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT)
-> (Ptr Word8 -> Int -> IO RdKafkaMessageT) -> IO RdKafkaMessageT
forall a b. (a -> b) -> a -> b
$ \keyPtr :: Ptr Word8
keyPtr keyLength :: Int
keyLength ->
RdKafkaMessageT -> IO RdKafkaMessageT
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaMessageT :: RdKafkaRespErrT
-> Ptr RdKafkaTopicT
-> Int
-> Int
-> Int
-> Int64
-> Ptr Word8
-> Ptr Word8
-> Ptr ()
-> RdKafkaMessageT
RdKafkaMessageT
{ err'RdKafkaMessageT :: RdKafkaRespErrT
err'RdKafkaMessageT = RdKafkaRespErrT
RdKafkaRespErrNoError
, topic'RdKafkaMessageT :: Ptr RdKafkaTopicT
topic'RdKafkaMessageT = Ptr RdKafkaTopicT
t
, partition'RdKafkaMessageT :: Int
partition'RdKafkaMessageT = Int
p
, len'RdKafkaMessageT :: Int
len'RdKafkaMessageT = Int
payloadLength
, payload'RdKafkaMessageT :: Ptr Word8
payload'RdKafkaMessageT = Ptr Word8
payloadPtr
, offset'RdKafkaMessageT :: Int64
offset'RdKafkaMessageT = 0
, keyLen'RdKafkaMessageT :: Int
keyLen'RdKafkaMessageT = Int
keyLength
, key'RdKafkaMessageT :: Ptr Word8
key'RdKafkaMessageT = Ptr Word8
keyPtr
, opaque'RdKafkaMessageT :: Ptr ()
opaque'RdKafkaMessageT = Ptr ()
forall a. Ptr a
nullPtr
}
closeProducer :: MonadIO m => KafkaProducer -> m ()
closeProducer :: KafkaProducer -> m ()
closeProducer = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer
flushProducer :: MonadIO m => KafkaProducer -> m ()
flushProducer :: KafkaProducer -> m ()
flushProducer kp :: KafkaProducer
kp = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 100)
Int
l <- Kafka -> IO Int
outboundQueueLength (KafkaProducer -> Kafka
kpKafkaPtr KafkaProducer
kp)
if (Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0)
then KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 0)
else KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer KafkaProducer
kp
withBS :: Maybe BS.ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS :: Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS Nothing f :: Ptr a -> Int -> IO b
f = Ptr a -> Int -> IO b
f Ptr a
forall a. Ptr a
nullPtr 0
withBS (Just bs :: ByteString
bs) f :: Ptr a -> Int -> IO b
f =
let (d :: ForeignPtr Word8
d, o :: Int
o, l :: Int
l) = ByteString -> (ForeignPtr Word8, Int, Int)
BSI.toForeignPtr ByteString
bs
in ForeignPtr Word8 -> (Ptr Word8 -> IO b) -> IO b
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr Word8
d ((Ptr Word8 -> IO b) -> IO b) -> (Ptr Word8 -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \p :: Ptr Word8
p -> Ptr a -> Int -> IO b
f (Ptr Word8
p Ptr Word8 -> Int -> Ptr a
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
o) Int
l
outboundQueueLength :: Kafka -> IO Int
outboundQueueLength :: Kafka -> IO Int
outboundQueueLength (Kafka k :: RdKafkaTPtr
k) = RdKafkaTPtr -> IO Int
rdKafkaOutqLen RdKafkaTPtr
k