{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings          #-}
module Kafka.Types
( BrokerId(..)
, PartitionId(..)
, Millis(..)
, ClientId(..)
, BatchSize(..)
, TopicName(..)
, BrokerAddress(..)
, Timeout(..)
, KafkaLogLevel(..)
, KafkaError(..)
, KafkaDebug(..)
, KafkaCompressionCodec(..)
, TopicType(..)
, topicType
, kafkaDebugToText
, kafkaCompressionCodecToText
)
where

import Control.Exception      (Exception (..))
import Data.Int               (Int64)
import Data.Text              (Text, isPrefixOf)
import Data.Typeable          (Typeable)
import GHC.Generics           (Generic)
import Kafka.Internal.RdKafka (RdKafkaRespErrT, rdKafkaErr2name, rdKafkaErr2str)

newtype BrokerId = BrokerId { unBrokerId :: Int } deriving (Show, Eq, Ord, Read, Generic)

newtype PartitionId = PartitionId { unPartitionId :: Int } deriving (Show, Eq, Read, Ord, Enum, Generic)
newtype Millis      = Millis { unMillis :: Int64 } deriving (Show, Read, Eq, Ord, Num, Generic)
newtype ClientId    = ClientId { unClientId :: Text } deriving (Show, Eq, Ord, Generic)
newtype BatchSize   = BatchSize { unBatchSize :: Int } deriving (Show, Read, Eq, Ord, Num, Generic)

data TopicType =
    User    -- ^ Normal topics that are created by user.
  | System  -- ^ Topics starting with "__" (__consumer_offsets, __confluent.support.metrics) are considered "system" topics
  deriving (Show, Read, Eq, Ord, Generic)

-- | Topic name to be consumed
--
-- Wildcard (regex) topics are supported by the /librdkafka/ assignor:
-- any topic name in the topics list that is prefixed with @^@ will
-- be regex-matched to the full list of topics in the cluster and matching
-- topics will be added to the subscription list.
newtype TopicName =
    TopicName { unTopicName :: Text } -- ^ a simple topic name or a regex if started with @^@
    deriving (Show, Eq, Ord, Read, Generic)

topicType :: TopicName -> TopicType
topicType (TopicName tn) =
  if "__" `isPrefixOf` tn then System else User
{-# INLINE topicType #-}

-- | Kafka broker address string (e.g. @broker1:9092@)
newtype BrokerAddress = BrokerAddress { unBrokerAddress :: Text } deriving (Show, Eq, Generic)

-- | Timeout in milliseconds
newtype Timeout = Timeout { unTimeout :: Int } deriving (Show, Eq, Read, Generic)

-- | Log levels for /librdkafka/.
data KafkaLogLevel =
  KafkaLogEmerg | KafkaLogAlert | KafkaLogCrit | KafkaLogErr | KafkaLogWarning |
  KafkaLogNotice | KafkaLogInfo | KafkaLogDebug
  deriving (Show, Enum, Eq)

--
-- | Any Kafka errors
data KafkaError =
    KafkaError Text
  | KafkaInvalidReturnValue
  | KafkaBadSpecification Text
  | KafkaResponseError RdKafkaRespErrT
  | KafkaInvalidConfigurationValue Text
  | KafkaUnknownConfigurationKey Text
  | KafkaBadConfiguration
    deriving (Eq, Show, Typeable, Generic)

instance Exception KafkaError where
  displayException (KafkaResponseError err) =
    "[" ++ rdKafkaErr2name err ++ "] " ++ rdKafkaErr2str err
  displayException err = show err

data KafkaDebug =
    DebugGeneric
  | DebugBroker
  | DebugTopic
  | DebugMetadata
  | DebugQueue
  | DebugMsg
  | DebugProtocol
  | DebugCgrp
  | DebugSecurity
  | DebugFetch
  | DebugFeature
  | DebugAll
  deriving (Eq, Show, Typeable, Generic)

kafkaDebugToText :: KafkaDebug -> Text
kafkaDebugToText d = case d of
  DebugGeneric  -> "generic"
  DebugBroker   -> "broker"
  DebugTopic    -> "topic"
  DebugMetadata -> "metadata"
  DebugQueue    -> "queue"
  DebugMsg      -> "msg"
  DebugProtocol -> "protocol"
  DebugCgrp     -> "cgrp"
  DebugSecurity -> "security"
  DebugFetch    -> "fetch"
  DebugFeature  -> "feature"
  DebugAll      -> "all"

data KafkaCompressionCodec =
    NoCompression
  | Gzip
  | Snappy
  | Lz4
  deriving (Eq, Show, Typeable, Generic)

kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text
kafkaCompressionCodecToText c = case c of
  NoCompression -> "none"
  Gzip          -> "gzip"
  Snappy        -> "snappy"
  Lz4           -> "lz4"