module Kafka.Consumer.ConsumerProperties where -- import Control.Monad import Data.Map (Map) import Kafka.Types import Kafka.Consumer.Types import qualified Data.Map as M import qualified Data.List as L -- | Properties to create 'KafkaConsumer'. data ConsumerProperties = ConsumerProperties { cpProps :: Map String String , cpRebalanceCallback :: Maybe ReballanceCallback , cpOffsetsCallback :: Maybe OffsetsCommitCallback , cpLogLevel :: Maybe KafkaLogLevel } instance Monoid ConsumerProperties where mempty = ConsumerProperties M.empty Nothing Nothing Nothing mappend (ConsumerProperties m1 rb1 oc1 ll1) (ConsumerProperties m2 rb2 oc2 ll2) = ConsumerProperties (M.union m1 m2) (rb2 `mplus` rb1) (oc2 `mplus` oc1) (ll2 `mplus` ll1) consumerBrokersList :: [BrokerAddress] -> ConsumerProperties consumerBrokersList bs = let bs' = L.intercalate "," ((\(BrokerAddress x) -> x) <$> bs) in extraConsumerProps $ M.fromList [("bootstrap.servers", bs')] -- | Disables auto commit for the consumer noAutoCommit :: ConsumerProperties noAutoCommit = extraConsumerProps $ M.fromList [("enable.auto.commit", "false")] -- | Consumer group id groupId :: ConsumerGroupId -> ConsumerProperties groupId (ConsumerGroupId cid) = extraConsumerProps $ M.fromList [("group.id", cid)] clientId :: ClientId -> ConsumerProperties clientId (ClientId cid) = extraConsumerProps $ M.fromList [("client.id", cid)] -- | Sets a callback that is called when rebalance is needed. -- -- Callback implementations suppose to watch for 'KafkaResponseError' 'RdKafkaRespErrAssignPartitions' and -- for 'KafkaResponseError' 'RdKafkaRespErrRevokePartitions'. Other error codes are not expected and would indicate -- something really bad happening in a system, or bugs in @librdkafka@ itself. -- -- A callback is expected to call 'assign' according to the error code it receives. -- -- * When 'RdKafkaRespErrAssignPartitions' happens 'assign' should be called with all the partitions it was called with. -- It is OK to alter partitions offsets before calling 'assign'. -- -- * When 'RdKafkaRespErrRevokePartitions' happens 'assign' should be called with an empty list of partitions. reballanceCallback :: ReballanceCallback -> ConsumerProperties reballanceCallback cb = ConsumerProperties M.empty (Just cb) Nothing Nothing -- | Sets offset commit callback for use with consumer groups. -- -- The results of automatic or manual offset commits will be scheduled -- for this callback and is served by `pollMessage`. -- -- A callback is expected to call 'assign' according to the error code it receives. -- -- If no partitions had valid offsets to commit this callback will be called -- with `KafkaError` == `KafkaResponseError` `RdKafkaRespErrNoOffset` which is not to be considered -- an error. offsetsCommitCallback :: OffsetsCommitCallback -> ConsumerProperties offsetsCommitCallback cb = ConsumerProperties M.empty Nothing (Just cb) Nothing -- | Sets the logging level. -- Usually is used with 'consumerDebug' to configure which logs are needed. consumerLogLevel :: KafkaLogLevel -> ConsumerProperties consumerLogLevel ll = ConsumerProperties M.empty Nothing Nothing (Just ll) -- | Sets the compression codec for the consumer. consumerCompression :: KafkaCompressionCodec -> ConsumerProperties consumerCompression c = extraConsumerProps $ M.singleton "compression.codec" (kafkaCompressionCodecToString c) -- | Suppresses consumer disconnects logs. -- -- It might be useful to turn this off when interacting with brokers -- with an aggressive connection.max.idle.ms value. consumerSuppressDisconnectLogs :: ConsumerProperties consumerSuppressDisconnectLogs = extraConsumerProps $ M.fromList [("log.connection.close", "false")] -- | Any configuration options that are supported by /librdkafka/. -- The full list can be found extraConsumerProps :: Map String String -> ConsumerProperties extraConsumerProps m = ConsumerProperties m Nothing Nothing Nothing {-# INLINE extraConsumerProps #-} -- | Any configuration options that are supported by /librdkafka/. -- The full list can be found extraConsumerProp :: String -> String -> ConsumerProperties extraConsumerProp k v = ConsumerProperties (M.singleton k v) Nothing Nothing Nothing {-# INLINE extraConsumerProp #-} -- | Sets debug features for the consumer. -- Usually is used with 'consumerLogLevel'. consumerDebug :: [KafkaDebug] -> ConsumerProperties consumerDebug [] = extraConsumerProps M.empty consumerDebug d = let points = L.intercalate "," (kafkaDebugToString <$> d) in extraConsumerProps $ M.fromList [("debug", points)] consumerQueuedMaxMessagesKBytes :: Int -> ConsumerProperties consumerQueuedMaxMessagesKBytes kBytes = extraConsumerProp "queued.max.messages.kbytes" (show kBytes) {-# INLINE consumerQueuedMaxMessagesKBytes #-}