module Kafka.Consumer.ConsumerProperties
where
import Control.Monad
import Data.Map (Map)
import Kafka.Types
import Kafka.Consumer.Types
import qualified Data.Map as M
import qualified Data.List as L
data ConsumerProperties = ConsumerProperties
{ cpProps :: Map String String
, cpRebalanceCallback :: Maybe ReballanceCallback
, cpOffsetsCallback :: Maybe OffsetsCommitCallback
, cpLogLevel :: Maybe KafkaLogLevel
}
instance Monoid ConsumerProperties where
mempty = ConsumerProperties M.empty Nothing Nothing Nothing
mappend (ConsumerProperties m1 rb1 oc1 ll1) (ConsumerProperties m2 rb2 oc2 ll2) =
ConsumerProperties (M.union m1 m2) (rb2 `mplus` rb1) (oc2 `mplus` oc1) (ll2 `mplus` ll1)
consumerBrokersList :: [BrokerAddress] -> ConsumerProperties
consumerBrokersList bs =
let bs' = L.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
in extraConsumerProps $ M.fromList [("bootstrap.servers", bs')]
noAutoCommit :: ConsumerProperties
noAutoCommit =
extraConsumerProps $ M.fromList [("enable.auto.commit", "false")]
groupId :: ConsumerGroupId -> ConsumerProperties
groupId (ConsumerGroupId cid) =
extraConsumerProps $ M.fromList [("group.id", cid)]
clientId :: ClientId -> ConsumerProperties
clientId (ClientId cid) =
extraConsumerProps $ M.fromList [("client.id", cid)]
reballanceCallback :: ReballanceCallback -> ConsumerProperties
reballanceCallback cb = ConsumerProperties M.empty (Just cb) Nothing Nothing
offsetsCommitCallback :: OffsetsCommitCallback -> ConsumerProperties
offsetsCommitCallback cb = ConsumerProperties M.empty Nothing (Just cb) Nothing
consumerLogLevel :: KafkaLogLevel -> ConsumerProperties
consumerLogLevel ll = ConsumerProperties M.empty Nothing Nothing (Just ll)
consumerCompression :: KafkaCompressionCodec -> ConsumerProperties
consumerCompression c =
extraConsumerProps $ M.singleton "compression.codec" (kafkaCompressionCodecToString c)
consumerSuppressDisconnectLogs :: ConsumerProperties
consumerSuppressDisconnectLogs =
extraConsumerProps $ M.fromList [("log.connection.close", "false")]
extraConsumerProps :: Map String String -> ConsumerProperties
extraConsumerProps m = ConsumerProperties m Nothing Nothing Nothing
{-# INLINE extraConsumerProps #-}
extraConsumerProp :: String -> String -> ConsumerProperties
extraConsumerProp k v = ConsumerProperties (M.singleton k v) Nothing Nothing Nothing
{-# INLINE extraConsumerProp #-}
consumerDebug :: [KafkaDebug] -> ConsumerProperties
consumerDebug [] = extraConsumerProps M.empty
consumerDebug d =
let points = L.intercalate "," (kafkaDebugToString <$> d)
in extraConsumerProps $ M.fromList [("debug", points)]
consumerQueuedMaxMessagesKBytes :: Int -> ConsumerProperties
consumerQueuedMaxMessagesKBytes kBytes =
extraConsumerProp "queued.max.messages.kbytes" (show kBytes)
{-# INLINE consumerQueuedMaxMessagesKBytes #-}