module Kafka.Internal.Setup where import Kafka.Types import Kafka.Internal.RdKafka import Kafka.Internal.RdKafkaEnum import Control.Exception import Control.Monad import Foreign import Foreign.C.String -- -- Configuration -- newtype KafkaProps = KafkaProps [(String, String)] deriving (Show, Eq) newtype TopicProps = TopicProps [(String, String)] deriving (Show, Eq) newtype KafkaConf = KafkaConf RdKafkaConfTPtr deriving Show newtype TopicConf = TopicConf RdKafkaTopicConfTPtr deriving Show newTopicConf :: IO TopicConf newTopicConf = TopicConf <$> newRdKafkaTopicConfT newKafkaConf :: IO KafkaConf newKafkaConf = KafkaConf <$> newRdKafkaConfT kafkaConf :: KafkaProps -> IO KafkaConf kafkaConf overrides = do conf <- newKafkaConf setAllKafkaConfValues conf overrides return conf topicConf :: TopicProps -> IO TopicConf topicConf overrides = do conf <- newTopicConf setAllTopicConfValues conf overrides return conf checkConfSetValue :: RdKafkaConfResT -> CCharBufPointer -> IO () checkConfSetValue err charPtr = case err of RdKafkaConfOk -> return () RdKafkaConfInvalid -> do str <- peekCString charPtr throw $ KafkaInvalidConfigurationValue str RdKafkaConfUnknown -> do str <- peekCString charPtr throw $ KafkaUnknownConfigurationKey str setKafkaConfValue :: KafkaConf -> String -> String -> IO () setKafkaConfValue (KafkaConf confPtr) key value = allocaBytes nErrorBytes $ \charPtr -> do err <- rdKafkaConfSet confPtr key value charPtr (fromIntegral nErrorBytes) checkConfSetValue err charPtr setAllKafkaConfValues :: KafkaConf -> KafkaProps -> IO () setAllKafkaConfValues conf (KafkaProps props) = forM_ props $ uncurry (setKafkaConfValue conf) setTopicConfValue :: TopicConf -> String -> String -> IO () setTopicConfValue (TopicConf confPtr) key value = allocaBytes nErrorBytes $ \charPtr -> do err <- rdKafkaTopicConfSet confPtr key value charPtr (fromIntegral nErrorBytes) checkConfSetValue err charPtr setAllTopicConfValues :: TopicConf -> TopicProps -> IO () setAllTopicConfValues conf (TopicProps props) = forM_ props $ uncurry (setTopicConfValue conf)