{-# LANGUAGE OverloadedStrings #-}

-----------------------------------------------------------------------------
-- |
-- Module with producer properties types and functions.
-----------------------------------------------------------------------------
module Kafka.Producer.ProducerProperties
( ProducerProperties(..)
, brokersList
, setCallback
, logLevel
, compression
, topicCompression
, sendTimeout
, statisticsInterval
, extraProps
, suppressDisconnectLogs
, extraTopicProps
, debugOptions
, module Kafka.Producer.Callbacks
)
where

import           Data.Text                (Text)
import qualified Data.Text                as Text
import           Control.Monad            (MonadPlus(mplus))
import           Data.Map                 (Map)
import qualified Data.Map                 as M
import           Data.Semigroup           as Sem
import           Kafka.Internal.Setup     (KafkaConf(..), Callback(..))
import           Kafka.Types              (KafkaDebug(..), Timeout(..), KafkaCompressionCodec(..), KafkaLogLevel(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText, Millis(..))

import           Kafka.Producer.Callbacks

-- | Properties to create 'Kafka.Producer.Types.KafkaProducer'.
data ProducerProperties = ProducerProperties
  { ProducerProperties -> Map Text Text
ppKafkaProps :: Map Text Text
  , ProducerProperties -> Map Text Text
ppTopicProps :: Map Text Text
  , ProducerProperties -> Maybe KafkaLogLevel
ppLogLevel   :: Maybe KafkaLogLevel
  , ProducerProperties -> [Callback]
ppCallbacks  :: [Callback]
  }

instance Sem.Semigroup ProducerProperties where
  (ProducerProperties k1 :: Map Text Text
k1 t1 :: Map Text Text
t1 ll1 :: Maybe KafkaLogLevel
ll1 cb1 :: [Callback]
cb1) <> :: ProducerProperties -> ProducerProperties -> ProducerProperties
<> (ProducerProperties k2 :: Map Text Text
k2 t2 :: Map Text Text
t2 ll2 :: Maybe KafkaLogLevel
ll2 cb2 :: [Callback]
cb2) =
    Map Text Text
-> Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> ProducerProperties
ProducerProperties (Map Text Text -> Map Text Text -> Map Text Text
forall k a. Ord k => Map k a -> Map k a -> Map k a
M.union Map Text Text
k2 Map Text Text
k1) (Map Text Text -> Map Text Text -> Map Text Text
forall k a. Ord k => Map k a -> Map k a -> Map k a
M.union Map Text Text
t2 Map Text Text
t1) (Maybe KafkaLogLevel
ll2 Maybe KafkaLogLevel -> Maybe KafkaLogLevel -> Maybe KafkaLogLevel
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` Maybe KafkaLogLevel
ll1) ([Callback]
cb1 [Callback] -> [Callback] -> [Callback]
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` [Callback]
cb2)
  {-# INLINE (<>) #-}

-- | /Right biased/ so we prefer newer properties over older ones.
instance Monoid ProducerProperties where
  mempty :: ProducerProperties
mempty = ProducerProperties :: Map Text Text
-> Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> ProducerProperties
ProducerProperties
    { ppKafkaProps :: Map Text Text
ppKafkaProps     = Map Text Text
forall k a. Map k a
M.empty
    , ppTopicProps :: Map Text Text
ppTopicProps     = Map Text Text
forall k a. Map k a
M.empty
    , ppLogLevel :: Maybe KafkaLogLevel
ppLogLevel       = Maybe KafkaLogLevel
forall a. Maybe a
Nothing
    , ppCallbacks :: [Callback]
ppCallbacks      = []
    }
  {-# INLINE mempty #-}
  mappend :: ProducerProperties -> ProducerProperties -> ProducerProperties
mappend = ProducerProperties -> ProducerProperties -> ProducerProperties
forall a. Semigroup a => a -> a -> a
(Sem.<>)
  {-# INLINE mappend #-}

-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList bs :: [BrokerAddress]
bs =
  let bs' :: Text
bs' = Text -> [Text] -> Text
Text.intercalate "," (BrokerAddress -> Text
unBrokerAddress (BrokerAddress -> Text) -> [BrokerAddress] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BrokerAddress]
bs)
   in Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("bootstrap.servers", Text
bs')]

-- | Set the producer callback.
--
-- For examples of use, see:
--
-- * 'errorCallback'
-- * 'logCallback'
-- * 'statsCallback'
setCallback :: Callback -> ProducerProperties
setCallback :: Callback -> ProducerProperties
setCallback cb :: Callback
cb = ProducerProperties
forall a. Monoid a => a
mempty { ppCallbacks :: [Callback]
ppCallbacks = [Callback
cb] }

-- | Sets the logging level.
-- Usually is used with 'debugOptions' to configure which logs are needed.
logLevel :: KafkaLogLevel -> ProducerProperties
logLevel :: KafkaLogLevel -> ProducerProperties
logLevel ll :: KafkaLogLevel
ll = ProducerProperties
forall a. Monoid a => a
mempty { ppLogLevel :: Maybe KafkaLogLevel
ppLogLevel = KafkaLogLevel -> Maybe KafkaLogLevel
forall a. a -> Maybe a
Just KafkaLogLevel
ll }

-- | Set the <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md compression.codec> for the producer.
compression :: KafkaCompressionCodec -> ProducerProperties
compression :: KafkaCompressionCodec -> ProducerProperties
compression c :: KafkaCompressionCodec
c =
  Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "compression.codec" (KafkaCompressionCodec -> Text
kafkaCompressionCodecToText KafkaCompressionCodec
c)

-- | Set the <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties compression.codec> for the topic.
topicCompression :: KafkaCompressionCodec -> ProducerProperties
topicCompression :: KafkaCompressionCodec -> ProducerProperties
topicCompression c :: KafkaCompressionCodec
c =
  Map Text Text -> ProducerProperties
extraTopicProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "compression.codec" (KafkaCompressionCodec -> Text
kafkaCompressionCodecToText KafkaCompressionCodec
c)

-- | Set the <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties message.timeout.ms>.
sendTimeout :: Timeout -> ProducerProperties
sendTimeout :: Timeout -> ProducerProperties
sendTimeout (Timeout t :: Int
t) =
  Map Text Text -> ProducerProperties
extraTopicProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "message.timeout.ms" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
t)

-- | Set the <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md statistics.interval.ms> for the producer.
statisticsInterval :: Millis -> ProducerProperties
statisticsInterval :: Millis -> ProducerProperties
statisticsInterval (Millis t :: Int64
t) =
  Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "statistics.interval.ms" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int64 -> String
forall a. Show a => a -> String
show Int64
t)

-- | Any configuration options that are supported by /librdkafka/.
-- The full list can be found <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md here>
extraProps :: Map Text Text -> ProducerProperties
extraProps :: Map Text Text -> ProducerProperties
extraProps m :: Map Text Text
m = ProducerProperties
forall a. Monoid a => a
mempty { ppKafkaProps :: Map Text Text
ppKafkaProps = Map Text Text
m }

-- | Suppresses producer disconnects logs.
--
-- It might be useful to turn this off when interacting with brokers
-- with an aggressive connection.max.idle.ms value.
suppressDisconnectLogs :: ProducerProperties
suppressDisconnectLogs :: ProducerProperties
suppressDisconnectLogs =
  Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("log.connection.close", "false")]

-- | Any *topic* configuration options that are supported by /librdkafka/.
-- The full list can be found <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties here>
extraTopicProps :: Map Text Text -> ProducerProperties
extraTopicProps :: Map Text Text -> ProducerProperties
extraTopicProps m :: Map Text Text
m = ProducerProperties
forall a. Monoid a => a
mempty { ppTopicProps :: Map Text Text
ppTopicProps = Map Text Text
m }

-- | Sets <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md debug> features for the producer
-- Usually is used with 'logLevel'.
debugOptions :: [KafkaDebug] -> ProducerProperties
debugOptions :: [KafkaDebug] -> ProducerProperties
debugOptions [] = Map Text Text -> ProducerProperties
extraProps Map Text Text
forall k a. Map k a
M.empty
debugOptions d :: [KafkaDebug]
d =
  let points :: Text
points = Text -> [Text] -> Text
Text.intercalate "," (KafkaDebug -> Text
kafkaDebugToText (KafkaDebug -> Text) -> [KafkaDebug] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [KafkaDebug]
d)
   in Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("debug", Text
points)]