{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} ----------------------------------------------------------------------------- -- | -- Module holding types shared by consumer and producer modules. ----------------------------------------------------------------------------- module Kafka.Types ( BrokerId(..) , PartitionId(..) , Millis(..) , ClientId(..) , BatchSize(..) , TopicName(..) , BrokerAddress(..) , Timeout(..) , KafkaLogLevel(..) , KafkaError(..) , KafkaDebug(..) , KafkaCompressionCodec(..) , TopicType(..) , topicType , kafkaDebugToText , kafkaCompressionCodecToText ) where import Control.Exception (Exception (..)) import Data.Int (Int64) import Data.String (IsString) import Data.Text (Text, isPrefixOf) import Data.Typeable (Typeable) import GHC.Generics (Generic) import Kafka.Internal.RdKafka (RdKafkaRespErrT, rdKafkaErr2name, rdKafkaErr2str) -- | Kafka broker ID newtype BrokerId = BrokerId { unBrokerId :: Int } deriving (Show, Eq, Ord, Read, Generic) -- | Topic partition ID newtype PartitionId = PartitionId { unPartitionId :: Int } deriving (Show, Eq, Read, Ord, Enum, Generic) -- | A number of milliseconds, used to represent durations and timestamps newtype Millis = Millis { unMillis :: Int64 } deriving (Show, Read, Eq, Ord, Num, Generic) -- | Client ID used by Kafka to better track requests -- -- See newtype ClientId = ClientId { unClientId :: Text } deriving (Show, Eq, IsString, Ord, Generic) -- | Batch size used for polling newtype BatchSize = BatchSize { unBatchSize :: Int } deriving (Show, Read, Eq, Ord, Num, Generic) -- | Whether the topic is created by a user or by the system data TopicType = User -- ^ Normal topics that are created by user. | System -- ^ Topics starting with a double underscore "\__" (@__consumer_offsets@, @__confluent.support.metrics@, etc.) are considered "system" topics deriving (Show, Read, Eq, Ord, Generic) -- | Topic name to consume/produce messages -- -- Wildcard (regex) topics are supported by the /librdkafka/ assignor: -- any topic name in the topics list that is prefixed with @^@ will -- be regex-matched to the full list of topics in the cluster and matching -- topics will be added to the subscription list. newtype TopicName = TopicName { unTopicName :: Text -- ^ a simple topic name or a regex if started with @^@ } deriving (Show, Eq, Ord, IsString, Read, Generic) -- | Deduce the type of a topic from its name, by checking if it starts with a double underscore "\__" topicType :: TopicName -> TopicType topicType (TopicName tn) = if "__" `isPrefixOf` tn then System else User {-# INLINE topicType #-} -- | Kafka broker address string (e.g. @broker1:9092@) newtype BrokerAddress = BrokerAddress { unBrokerAddress :: Text } deriving (Show, Eq, IsString, Generic) -- | Timeout in milliseconds newtype Timeout = Timeout { unTimeout :: Int } deriving (Show, Eq, Read, Generic) -- | Log levels for /librdkafka/. data KafkaLogLevel = KafkaLogEmerg | KafkaLogAlert | KafkaLogCrit | KafkaLogErr | KafkaLogWarning | KafkaLogNotice | KafkaLogInfo | KafkaLogDebug deriving (Show, Enum, Eq) -- | All possible Kafka errors data KafkaError = KafkaError Text | KafkaInvalidReturnValue | KafkaBadSpecification Text | KafkaResponseError RdKafkaRespErrT | KafkaInvalidConfigurationValue Text | KafkaUnknownConfigurationKey Text | KafkaBadConfiguration deriving (Eq, Show, Typeable, Generic) instance Exception KafkaError where displayException (KafkaResponseError err) = "[" ++ rdKafkaErr2name err ++ "] " ++ rdKafkaErr2str err displayException err = show err -- | Available /librdkafka/ debug contexts data KafkaDebug = DebugGeneric | DebugBroker | DebugTopic | DebugMetadata | DebugQueue | DebugMsg | DebugProtocol | DebugCgrp | DebugSecurity | DebugFetch | DebugFeature | DebugAll deriving (Eq, Show, Typeable, Generic) -- | Convert a 'KafkaDebug' into its /librdkafka/ string equivalent. -- -- This is used internally by the library but may be useful to some developers. kafkaDebugToText :: KafkaDebug -> Text kafkaDebugToText d = case d of DebugGeneric -> "generic" DebugBroker -> "broker" DebugTopic -> "topic" DebugMetadata -> "metadata" DebugQueue -> "queue" DebugMsg -> "msg" DebugProtocol -> "protocol" DebugCgrp -> "cgrp" DebugSecurity -> "security" DebugFetch -> "fetch" DebugFeature -> "feature" DebugAll -> "all" -- | Compression codec used by a topic -- -- See data KafkaCompressionCodec = NoCompression | Gzip | Snappy | Lz4 deriving (Eq, Show, Typeable, Generic) -- | Convert a 'KafkaCompressionCodec' into its /librdkafka/ string equivalent. -- -- This is used internally by the library but may be useful to some developers. kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text kafkaCompressionCodecToText c = case c of NoCompression -> "none" Gzip -> "gzip" Snappy -> "snappy" Lz4 -> "lz4"