module Kafka.Consumer.ConsumerProperties
( module Kafka.Consumer.ConsumerProperties
, module Kafka.Consumer.Callbacks
)
where
import Control.Monad
import qualified Data.List as L
import Data.Map (Map)
import qualified Data.Map as M
import Data.Semigroup
import Kafka.Consumer.Callbacks
import Kafka.Consumer.Types
import Kafka.Types
data ConsumerProperties = ConsumerProperties
{ cpProps :: Map String String
, cpLogLevel :: Maybe KafkaLogLevel
, callbacks :: [KafkaConf -> IO ()]
}
instance Semigroup ConsumerProperties where
(<>) (ConsumerProperties m1 ll1 cb1) (ConsumerProperties m2 ll2 cb2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb2 <> cb1)
instance Monoid ConsumerProperties where
mempty = ConsumerProperties M.empty Nothing []
mappend = (<>)
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs =
let bs' = L.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]
noAutoCommit :: ConsumerProperties
noAutoCommit =
extraProps $ M.fromList [("enable.auto.commit", "false")]
groupId :: ConsumerGroupId -> ConsumerProperties
groupId (ConsumerGroupId cid) =
extraProps $ M.fromList [("group.id", cid)]
clientId :: ClientId -> ConsumerProperties
clientId (ClientId cid) =
extraProps $ M.fromList [("client.id", cid)]
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
setCallback cb = ConsumerProperties M.empty Nothing [cb]
logLevel :: KafkaLogLevel -> ConsumerProperties
logLevel ll = ConsumerProperties M.empty (Just ll) []
compression :: KafkaCompressionCodec -> ConsumerProperties
compression c =
extraProps $ M.singleton "compression.codec" (kafkaCompressionCodecToString c)
suppressDisconnectLogs :: ConsumerProperties
suppressDisconnectLogs =
extraProps $ M.fromList [("log.connection.close", "false")]
extraProps :: Map String String -> ConsumerProperties
extraProps m = ConsumerProperties m Nothing []
{-# INLINE extraProps #-}
extraProp :: String -> String -> ConsumerProperties
extraProp k v = ConsumerProperties (M.singleton k v) Nothing []
{-# INLINE extraProp #-}
debugOptions :: [KafkaDebug] -> ConsumerProperties
debugOptions [] = extraProps M.empty
debugOptions d =
let points = L.intercalate "," (kafkaDebugToString <$> d)
in extraProps $ M.fromList [("debug", points)]
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes kBytes =
extraProp "queued.max.messages.kbytes" (show kBytes)
{-# INLINE queuedMaxMessagesKBytes #-}