module Kafka.Producer.ProducerProperties
( module Kafka.Producer.ProducerProperties
, module Kafka.Callbacks
)
where
import Control.Monad
import qualified Data.List as L
import Data.Map (Map)
import qualified Data.Map as M
import Kafka.Callbacks
import Kafka.Types
data ProducerProperties = ProducerProperties
{ ppKafkaProps :: Map String String
, ppTopicProps :: Map String String
, ppLogLevel :: Maybe KafkaLogLevel
, callbacks :: [KafkaConf -> IO ()]
}
instance Monoid ProducerProperties where
mempty = ProducerProperties M.empty M.empty Nothing []
mappend (ProducerProperties k1 t1 ll1 cb1) (ProducerProperties k2 t2 ll2 cb2) =
ProducerProperties (M.union k1 k2) (M.union t1 t2) (ll2 `mplus` ll1) (cb2 `mplus` cb1)
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList bs =
let bs' = L.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]
setCallback :: (KafkaConf -> IO ()) -> ProducerProperties
setCallback cb = ProducerProperties M.empty M.empty Nothing [cb]
logLevel :: KafkaLogLevel -> ProducerProperties
logLevel ll = ProducerProperties M.empty M.empty (Just ll) []
compression :: KafkaCompressionCodec -> ProducerProperties
compression c =
extraProps $ M.singleton "compression.codec" (kafkaCompressionCodecToString c)
topicCompression :: KafkaCompressionCodec -> ProducerProperties
topicCompression c =
extraTopicProps $ M.singleton "compression.codec" (kafkaCompressionCodecToString c)
extraProps :: Map String String -> ProducerProperties
extraProps m = ProducerProperties m M.empty Nothing []
suppressDisconnectLogs :: ProducerProperties
suppressDisconnectLogs =
extraProps $ M.fromList [("log.connection.close", "false")]
extraTopicProps :: Map String String -> ProducerProperties
extraTopicProps m = ProducerProperties M.empty m Nothing []
debugOptions :: [KafkaDebug] -> ProducerProperties
debugOptions [] = extraProps M.empty
debugOptions d =
let points = L.intercalate "," (kafkaDebugToString <$> d)
in extraProps $ M.fromList [("debug", points)]