{-# LANGUAGE OverloadedStrings #-}
module Kafka.Consumer.ConsumerProperties
( ConsumerProperties(..)
, brokersList
, noAutoCommit
, noAutoOffsetStore
, groupId
, clientId
, setCallback
, logLevel
, compression
, suppressDisconnectLogs
, extraProps
, extraProp
, debugOptions
, queuedMaxMessagesKBytes
, module X
)
where
import Control.Monad (MonadPlus(mplus))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Data.Text (Text)
import qualified Data.Text as Text
import Kafka.Consumer.Types (ConsumerGroupId(..))
import Kafka.Internal.Setup (KafkaConf(..))
import Kafka.Types (KafkaDebug(..), KafkaCompressionCodec(..), KafkaLogLevel(..), ClientId(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText)
import Kafka.Consumer.Callbacks as X
data ConsumerProperties = ConsumerProperties
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [KafkaConf -> IO ()]
}
instance Sem.Semigroup ConsumerProperties where
(ConsumerProperties m1 ll1 cb1) <> (ConsumerProperties m2 ll2 cb2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2)
{-# INLINE (<>) #-}
instance Monoid ConsumerProperties where
mempty = ConsumerProperties
{ cpProps = M.empty
, cpLogLevel = Nothing
, cpCallbacks = []
}
{-# INLINE mempty #-}
mappend = (Sem.<>)
{-# INLINE mappend #-}
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs =
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]
noAutoCommit :: ConsumerProperties
noAutoCommit =
extraProps $ M.fromList [("enable.auto.commit", "false")]
noAutoOffsetStore :: ConsumerProperties
noAutoOffsetStore =
extraProps $ M.fromList [("enable.auto.offset.store", "false")]
groupId :: ConsumerGroupId -> ConsumerProperties
groupId (ConsumerGroupId cid) =
extraProps $ M.fromList [("group.id", cid)]
clientId :: ClientId -> ConsumerProperties
clientId (ClientId cid) =
extraProps $ M.fromList [("client.id", cid)]
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
setCallback cb = mempty { cpCallbacks = [cb] }
logLevel :: KafkaLogLevel -> ConsumerProperties
logLevel ll = mempty { cpLogLevel = Just ll }
compression :: KafkaCompressionCodec -> ConsumerProperties
compression c =
extraProps $ M.singleton "compression.codec" (kafkaCompressionCodecToText c)
suppressDisconnectLogs :: ConsumerProperties
suppressDisconnectLogs =
extraProps $ M.fromList [("log.connection.close", "false")]
extraProps :: Map Text Text -> ConsumerProperties
extraProps m = mempty { cpProps = m }
{-# INLINE extraProps #-}
extraProp :: Text -> Text -> ConsumerProperties
extraProp k v = mempty { cpProps = M.singleton k v }
{-# INLINE extraProp #-}
debugOptions :: [KafkaDebug] -> ConsumerProperties
debugOptions [] = extraProps M.empty
debugOptions d =
let points = Text.intercalate "," (kafkaDebugToText <$> d)
in extraProps $ M.fromList [("debug", points)]
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes kBytes =
extraProp "queued.max.messages.kbytes" (Text.pack $ show kBytes)
{-# INLINE queuedMaxMessagesKBytes #-}