{-# LANGUAGE OverloadedStrings #-}
module Kafka.Consumer.ConsumerProperties
( ConsumerProperties(..)
, CallbackPollMode(..)
, brokersList
, autoCommit
, noAutoCommit
, noAutoOffsetStore
, groupId
, clientId
, setCallback
, logLevel
, compression
, suppressDisconnectLogs
, statisticsInterval
, extraProps
, extraProp
, debugOptions
, queuedMaxMessagesKBytes
, callbackPollMode
, module X
)
where
import Control.Monad (MonadPlus (mplus))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Data.Text (Text)
import qualified Data.Text as Text
import Kafka.Consumer.Types (ConsumerGroupId (..))
import Kafka.Internal.Setup (KafkaConf (..), Callback(..))
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText)
import Kafka.Consumer.Callbacks as X
data CallbackPollMode =
CallbackPollModeSync
| CallbackPollModeAsync deriving (Show, Eq)
data ConsumerProperties = ConsumerProperties
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [Callback]
, cpCallbackPollMode :: CallbackPollMode
}
instance Sem.Semigroup ConsumerProperties where
(ConsumerProperties m1 ll1 cb1 _) <> (ConsumerProperties m2 ll2 cb2 cup2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) cup2
{-# INLINE (<>) #-}
instance Monoid ConsumerProperties where
mempty = ConsumerProperties
{ cpProps = M.empty
, cpLogLevel = Nothing
, cpCallbacks = []
, cpCallbackPollMode = CallbackPollModeAsync
}
{-# INLINE mempty #-}
mappend = (Sem.<>)
{-# INLINE mappend #-}
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs =
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]
autoCommit :: Millis -> ConsumerProperties
autoCommit (Millis ms) = extraProps $
M.fromList
[ ("enable.auto.commit", "true")
, ("auto.commit.interval.ms", Text.pack $ show ms)
]
noAutoCommit :: ConsumerProperties
noAutoCommit =
extraProps $ M.fromList [("enable.auto.commit", "false")]
noAutoOffsetStore :: ConsumerProperties
noAutoOffsetStore =
extraProps $ M.fromList [("enable.auto.offset.store", "false")]
groupId :: ConsumerGroupId -> ConsumerProperties
groupId (ConsumerGroupId cid) =
extraProps $ M.fromList [("group.id", cid)]
clientId :: ClientId -> ConsumerProperties
clientId (ClientId cid) =
extraProps $ M.fromList [("client.id", cid)]
setCallback :: Callback -> ConsumerProperties
setCallback cb = mempty { cpCallbacks = [cb] }
logLevel :: KafkaLogLevel -> ConsumerProperties
logLevel ll = mempty { cpLogLevel = Just ll }
compression :: KafkaCompressionCodec -> ConsumerProperties
compression c =
extraProps $ M.singleton "compression.codec" (kafkaCompressionCodecToText c)
suppressDisconnectLogs :: ConsumerProperties
suppressDisconnectLogs =
extraProps $ M.fromList [("log.connection.close", "false")]
statisticsInterval :: Millis -> ConsumerProperties
statisticsInterval (Millis t) =
extraProps $ M.singleton "statistics.interval.ms" (Text.pack $ show t)
extraProps :: Map Text Text -> ConsumerProperties
extraProps m = mempty { cpProps = m }
{-# INLINE extraProps #-}
extraProp :: Text -> Text -> ConsumerProperties
extraProp k v = mempty { cpProps = M.singleton k v }
{-# INLINE extraProp #-}
debugOptions :: [KafkaDebug] -> ConsumerProperties
debugOptions [] = extraProps M.empty
debugOptions d =
let points = Text.intercalate "," (kafkaDebugToText <$> d)
in extraProps $ M.fromList [("debug", points)]
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes kBytes =
extraProp "queued.max.messages.kbytes" (Text.pack $ show kBytes)
{-# INLINE queuedMaxMessagesKBytes #-}
callbackPollMode :: CallbackPollMode -> ConsumerProperties
callbackPollMode mode = mempty { cpCallbackPollMode = mode }