{-# LANGUAGE OverloadedStrings #-}

-----------------------------------------------------------------------------
-- |
-- Module with consumer properties types and functions.
-----------------------------------------------------------------------------
module Kafka.Consumer.ConsumerProperties
( ConsumerProperties(..)
, CallbackPollMode(..)
, brokersList
, autoCommit
, noAutoCommit
, noAutoOffsetStore
, groupId
, clientId
, setCallback
, logLevel
, compression
, suppressDisconnectLogs
, statisticsInterval
, extraProps
, extraProp
, debugOptions
, queuedMaxMessagesKBytes
, callbackPollMode
, module X
)
where

import           Control.Monad        (MonadPlus (mplus))
import           Data.Map             (Map)
import qualified Data.Map             as M
import           Data.Semigroup       as Sem
import           Data.Text            (Text)
import qualified Data.Text            as Text
import           Kafka.Consumer.Types (ConsumerGroupId (..))
import           Kafka.Internal.Setup (KafkaConf (..), Callback(..))
import           Kafka.Types          (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText)

import Kafka.Consumer.Callbacks as X

-- | Whether the callback polling should be done synchronously or not.
data CallbackPollMode =
    -- | You have to poll the consumer frequently to handle new messages 
    -- as well as rebalance and keep alive events.
    -- This enables lowering the footprint and having full control over when polling
    -- happens, at the cost of manually managing those events.
    CallbackPollModeSync
    -- | Handle polling rebalance and keep alive events for you in a background thread.
  | CallbackPollModeAsync deriving (Int -> CallbackPollMode -> ShowS
[CallbackPollMode] -> ShowS
CallbackPollMode -> String
(Int -> CallbackPollMode -> ShowS)
-> (CallbackPollMode -> String)
-> ([CallbackPollMode] -> ShowS)
-> Show CallbackPollMode
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CallbackPollMode] -> ShowS
$cshowList :: [CallbackPollMode] -> ShowS
show :: CallbackPollMode -> String
$cshow :: CallbackPollMode -> String
showsPrec :: Int -> CallbackPollMode -> ShowS
$cshowsPrec :: Int -> CallbackPollMode -> ShowS
Show, CallbackPollMode -> CallbackPollMode -> Bool
(CallbackPollMode -> CallbackPollMode -> Bool)
-> (CallbackPollMode -> CallbackPollMode -> Bool)
-> Eq CallbackPollMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CallbackPollMode -> CallbackPollMode -> Bool
$c/= :: CallbackPollMode -> CallbackPollMode -> Bool
== :: CallbackPollMode -> CallbackPollMode -> Bool
$c== :: CallbackPollMode -> CallbackPollMode -> Bool
Eq)

-- | Properties to create 'Kafka.Consumer.Types.KafkaConsumer'.
data ConsumerProperties = ConsumerProperties
  { ConsumerProperties -> Map Text Text
cpProps            :: Map Text Text
  , ConsumerProperties -> Maybe KafkaLogLevel
cpLogLevel         :: Maybe KafkaLogLevel
  , ConsumerProperties -> [Callback]
cpCallbacks        :: [Callback]
  , ConsumerProperties -> CallbackPollMode
cpCallbackPollMode :: CallbackPollMode
  }

instance Sem.Semigroup ConsumerProperties where
  (ConsumerProperties m1 :: Map Text Text
m1 ll1 :: Maybe KafkaLogLevel
ll1 cb1 :: [Callback]
cb1 _) <> :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties
<> (ConsumerProperties m2 :: Map Text Text
m2 ll2 :: Maybe KafkaLogLevel
ll2 cb2 :: [Callback]
cb2 cup2 :: CallbackPollMode
cup2) =
    Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> CallbackPollMode
-> ConsumerProperties
ConsumerProperties (Map Text Text -> Map Text Text -> Map Text Text
forall k a. Ord k => Map k a -> Map k a -> Map k a
M.union Map Text Text
m2 Map Text Text
m1) (Maybe KafkaLogLevel
ll2 Maybe KafkaLogLevel -> Maybe KafkaLogLevel -> Maybe KafkaLogLevel
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` Maybe KafkaLogLevel
ll1) ([Callback]
cb1 [Callback] -> [Callback] -> [Callback]
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` [Callback]
cb2) CallbackPollMode
cup2
  {-# INLINE (<>) #-}

-- | /Right biased/ so we prefer newer properties over older ones.
instance Monoid ConsumerProperties where
  mempty :: ConsumerProperties
mempty = ConsumerProperties :: Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> CallbackPollMode
-> ConsumerProperties
ConsumerProperties
    { cpProps :: Map Text Text
cpProps             = Map Text Text
forall k a. Map k a
M.empty
    , cpLogLevel :: Maybe KafkaLogLevel
cpLogLevel          = Maybe KafkaLogLevel
forall a. Maybe a
Nothing
    , cpCallbacks :: [Callback]
cpCallbacks         = []
    , cpCallbackPollMode :: CallbackPollMode
cpCallbackPollMode  = CallbackPollMode
CallbackPollModeAsync
    }
  {-# INLINE mempty #-}
  mappend :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties
mappend = ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
(Sem.<>)
  {-# INLINE mappend #-}

-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs :: [BrokerAddress]
bs =
  let bs' :: Text
bs' = Text -> [Text] -> Text
Text.intercalate "," (BrokerAddress -> Text
unBrokerAddress (BrokerAddress -> Text) -> [BrokerAddress] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BrokerAddress]
bs)
   in Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("bootstrap.servers", Text
bs')]

-- | Set the <https://kafka.apache.org/documentation/#auto.commit.interval.ms auto commit interval> and enables <https://kafka.apache.org/documentation/#enable.auto.commit auto commit>.
autoCommit :: Millis -> ConsumerProperties
autoCommit :: Millis -> ConsumerProperties
autoCommit (Millis ms :: Int64
ms) = Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$
  [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList
    [ ("enable.auto.commit", "true")
    , ("auto.commit.interval.ms", String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int64 -> String
forall a. Show a => a -> String
show Int64
ms)
    ]

-- | Disable <https://kafka.apache.org/documentation/#enable.auto.commit auto commit> for the consumer.
noAutoCommit :: ConsumerProperties
noAutoCommit :: ConsumerProperties
noAutoCommit =
  Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("enable.auto.commit", "false")]

-- | Disable auto offset store for the consumer.
--
-- See <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md enable.auto.offset.store> for more information.
noAutoOffsetStore :: ConsumerProperties
noAutoOffsetStore :: ConsumerProperties
noAutoOffsetStore =
  Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("enable.auto.offset.store", "false")]

-- | Set the consumer <https://kafka.apache.org/documentation/#group.id group id>.
groupId :: ConsumerGroupId -> ConsumerProperties
groupId :: ConsumerGroupId -> ConsumerProperties
groupId (ConsumerGroupId cid :: Text
cid) =
  Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("group.id", Text
cid)]

-- | Set the <https://kafka.apache.org/documentation/#client.id consumer identifier>.
clientId :: ClientId -> ConsumerProperties
clientId :: ClientId -> ConsumerProperties
clientId (ClientId cid :: Text
cid) =
  Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("client.id", Text
cid)]

-- | Set the consumer callback.
--
-- For examples of use, see:
--
-- * 'errorCallback'
-- * 'logCallback'
-- * 'statsCallback'
setCallback :: Callback -> ConsumerProperties
setCallback :: Callback -> ConsumerProperties
setCallback cb :: Callback
cb = ConsumerProperties
forall a. Monoid a => a
mempty { cpCallbacks :: [Callback]
cpCallbacks = [Callback
cb] }

-- | Set the logging level.
-- Usually is used with 'debugOptions' to configure which logs are needed.
logLevel :: KafkaLogLevel -> ConsumerProperties
logLevel :: KafkaLogLevel -> ConsumerProperties
logLevel ll :: KafkaLogLevel
ll = ConsumerProperties
forall a. Monoid a => a
mempty { cpLogLevel :: Maybe KafkaLogLevel
cpLogLevel = KafkaLogLevel -> Maybe KafkaLogLevel
forall a. a -> Maybe a
Just KafkaLogLevel
ll }

-- | Set the <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md compression.codec> for the consumer.
compression :: KafkaCompressionCodec -> ConsumerProperties
compression :: KafkaCompressionCodec -> ConsumerProperties
compression c :: KafkaCompressionCodec
c =
  Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "compression.codec" (KafkaCompressionCodec -> Text
kafkaCompressionCodecToText KafkaCompressionCodec
c)

-- | Suppresses consumer <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md log.connection.close>.
--
-- It might be useful to turn this off when interacting with brokers
-- with an aggressive @connection.max.idle.ms@ value.
suppressDisconnectLogs :: ConsumerProperties
suppressDisconnectLogs :: ConsumerProperties
suppressDisconnectLogs =
  Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("log.connection.close", "false")]

-- | Set the <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md statistics.interval.ms> for the producer.
statisticsInterval :: Millis -> ConsumerProperties
statisticsInterval :: Millis -> ConsumerProperties
statisticsInterval (Millis t :: Int64
t) =
  Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "statistics.interval.ms" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int64 -> String
forall a. Show a => a -> String
show Int64
t)

-- | Set any configuration options that are supported by /librdkafka/.
-- The full list can be found <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md here>
extraProps :: Map Text Text -> ConsumerProperties
extraProps :: Map Text Text -> ConsumerProperties
extraProps m :: Map Text Text
m = ConsumerProperties
forall a. Monoid a => a
mempty { cpProps :: Map Text Text
cpProps = Map Text Text
m }
{-# INLINE extraProps #-}

-- | Set any configuration option that is supported by /librdkafka/.
-- The full list can be found <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md here>
extraProp :: Text -> Text -> ConsumerProperties
extraProp :: Text -> Text -> ConsumerProperties
extraProp k :: Text
k v :: Text
v = ConsumerProperties
forall a. Monoid a => a
mempty { cpProps :: Map Text Text
cpProps = Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton Text
k Text
v }
{-# INLINE extraProp #-}

-- | Set <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md debug> features for the consumer.
-- Usually is used with 'logLevel'.
debugOptions :: [KafkaDebug] -> ConsumerProperties
debugOptions :: [KafkaDebug] -> ConsumerProperties
debugOptions [] = Map Text Text -> ConsumerProperties
extraProps Map Text Text
forall k a. Map k a
M.empty
debugOptions d :: [KafkaDebug]
d =
  let points :: Text
points = Text -> [Text] -> Text
Text.intercalate "," (KafkaDebug -> Text
kafkaDebugToText (KafkaDebug -> Text) -> [KafkaDebug] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [KafkaDebug]
d)
   in Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("debug", Text
points)]

-- | Set <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md queued.max.messages.kbytes>
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes kBytes :: Int
kBytes =
  Text -> Text -> ConsumerProperties
extraProp "queued.max.messages.kbytes" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
kBytes)
{-# INLINE queuedMaxMessagesKBytes #-}

-- | Set the callback poll mode. Default value is 'CallbackPollModeAsync'.
callbackPollMode :: CallbackPollMode -> ConsumerProperties
callbackPollMode :: CallbackPollMode -> ConsumerProperties
callbackPollMode mode :: CallbackPollMode
mode = ConsumerProperties
forall a. Monoid a => a
mempty { cpCallbackPollMode :: CallbackPollMode
cpCallbackPollMode = CallbackPollMode
mode }