module Kafka.Internal.Setup where
import Kafka.Internal.RdKafka
import Kafka.Types
import Control.Exception
import Control.Monad
import Data.IORef
import Foreign
import Foreign.C.String
import Kafka.Internal.CancellationToken
newtype KafkaProps = KafkaProps [(String, String)] deriving (Show, Eq)
newtype TopicProps = TopicProps [(String, String)] deriving (Show, Eq)
newtype Kafka = Kafka RdKafkaTPtr deriving Show
data KafkaConf = KafkaConf RdKafkaConfTPtr (IORef (Maybe RdKafkaQueueTPtr)) CancellationToken
newtype TopicConf = TopicConf RdKafkaTopicConfTPtr deriving Show
class HasKafka a where
getKafka :: a -> Kafka
class HasKafkaConf a where
getKafkaConf :: a -> KafkaConf
class HasTopicConf a where
getTopicConf :: a -> TopicConf
instance HasKafkaConf KafkaConf where
getKafkaConf = id
{-# INLINE getKafkaConf #-}
instance HasKafka Kafka where
getKafka = id
{-# INLINE getKafka #-}
instance HasTopicConf TopicConf where
getTopicConf = id
{-# INLINE getTopicConf #-}
getRdKafka :: HasKafka k => k -> RdKafkaTPtr
getRdKafka k = let (Kafka k') = getKafka k in k'
{-# INLINE getRdKafka #-}
getRdKafkaConf :: HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf k = let (KafkaConf k' _ _) = getKafkaConf k in k'
{-# INLINE getRdKafkaConf #-}
getRdMsgQueue :: HasKafkaConf k => k -> IO (Maybe RdKafkaQueueTPtr)
getRdMsgQueue k =
let (KafkaConf _ rq _) = getKafkaConf k
in readIORef rq
getRdTopicConf :: HasTopicConf t => t -> RdKafkaTopicConfTPtr
getRdTopicConf t = let (TopicConf t') = getTopicConf t in t'
{-# INLINE getRdTopicConf #-}
newTopicConf :: IO TopicConf
newTopicConf = TopicConf <$> newRdKafkaTopicConfT
newKafkaConf :: IO KafkaConf
newKafkaConf = KafkaConf <$> newRdKafkaConfT <*> newIORef Nothing <*> newCancellationToken
kafkaConf :: KafkaProps -> IO KafkaConf
kafkaConf overrides = do
conf <- newKafkaConf
setAllKafkaConfValues conf overrides
return conf
topicConf :: TopicProps -> IO TopicConf
topicConf overrides = do
conf <- newTopicConf
setAllTopicConfValues conf overrides
return conf
checkConfSetValue :: RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue err charPtr = case err of
RdKafkaConfOk -> return ()
RdKafkaConfInvalid -> do
str <- peekCString charPtr
throw $ KafkaInvalidConfigurationValue str
RdKafkaConfUnknown -> do
str <- peekCString charPtr
throw $ KafkaUnknownConfigurationKey str
setKafkaConfValue :: KafkaConf -> String -> String -> IO ()
setKafkaConfValue (KafkaConf confPtr _ _) key value =
allocaBytes nErrorBytes $ \charPtr -> do
err <- rdKafkaConfSet confPtr key value charPtr (fromIntegral nErrorBytes)
checkConfSetValue err charPtr
setAllKafkaConfValues :: KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues conf (KafkaProps props) = forM_ props $ uncurry (setKafkaConfValue conf)
setTopicConfValue :: TopicConf -> String -> String -> IO ()
setTopicConfValue (TopicConf confPtr) key value =
allocaBytes nErrorBytes $ \charPtr -> do
err <- rdKafkaTopicConfSet confPtr key value charPtr (fromIntegral nErrorBytes)
checkConfSetValue err charPtr
setAllTopicConfValues :: TopicConf -> TopicProps -> IO ()
setAllTopicConfValues conf (TopicProps props) = forM_ props $ uncurry (setTopicConfValue conf)