module Kafka.Internal.Setup where
import Kafka.Internal.RdKafka
import Kafka.Types
import Control.Exception
import Control.Monad
import Foreign
import Foreign.C.String
import Kafka.Internal.CancellationToken
newtype KafkaProps = KafkaProps [(String, String)] deriving (Show, Eq)
newtype TopicProps = TopicProps [(String, String)] deriving (Show, Eq)
newTopicConf :: IO TopicConf
newTopicConf = TopicConf <$> newRdKafkaTopicConfT
newKafkaConf :: IO KafkaConf
newKafkaConf = KafkaConf <$> newRdKafkaConfT <*> newCancellationToken
kafkaConf :: KafkaProps -> IO KafkaConf
kafkaConf overrides = do
conf <- newKafkaConf
setAllKafkaConfValues conf overrides
return conf
topicConf :: TopicProps -> IO TopicConf
topicConf overrides = do
conf <- newTopicConf
setAllTopicConfValues conf overrides
return conf
checkConfSetValue :: RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue err charPtr = case err of
RdKafkaConfOk -> return ()
RdKafkaConfInvalid -> do
str <- peekCString charPtr
throw $ KafkaInvalidConfigurationValue str
RdKafkaConfUnknown -> do
str <- peekCString charPtr
throw $ KafkaUnknownConfigurationKey str
setKafkaConfValue :: KafkaConf -> String -> String -> IO ()
setKafkaConfValue (KafkaConf confPtr _) key value =
allocaBytes nErrorBytes $ \charPtr -> do
err <- rdKafkaConfSet confPtr key value charPtr (fromIntegral nErrorBytes)
checkConfSetValue err charPtr
setAllKafkaConfValues :: KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues conf (KafkaProps props) = forM_ props $ uncurry (setKafkaConfValue conf)
setTopicConfValue :: TopicConf -> String -> String -> IO ()
setTopicConfValue (TopicConf confPtr) key value =
allocaBytes nErrorBytes $ \charPtr -> do
err <- rdKafkaTopicConfSet confPtr key value charPtr (fromIntegral nErrorBytes)
checkConfSetValue err charPtr
setAllTopicConfValues :: TopicConf -> TopicProps -> IO ()
setAllTopicConfValues conf (TopicProps props) = forM_ props $ uncurry (setTopicConfValue conf)