{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE OverloadedStrings #-} module Kafka.Metadata ( KafkaMetadata(..), BrokerMetadata(..), TopicMetadata(..), PartitionMetadata(..) , WatermarkOffsets(..) , GroupMemberId(..), GroupMemberInfo(..) , GroupProtocolType(..), GroupProtocol(..), GroupState(..) , GroupInfo(..) , allTopicsMetadata, topicMetadata , watermarkOffsets, watermarkOffsets' , partitionWatermarkOffsets , offsetsForTime, offsetsForTime', topicOffsetsForTime , allConsumerGroupsInfo, consumerGroupInfo ) where import Control.Arrow (left) import Control.Exception (bracket) import Control.Monad.IO.Class (MonadIO (liftIO)) import Data.Bifunctor (bimap) import Data.ByteString (ByteString, pack) import Data.Monoid ((<>)) import Data.Text (Text) import Foreign (Storable (peek), peekArray, withForeignPtr) import GHC.Generics (Generic) import Kafka.Consumer.Convert (fromNativeTopicPartitionList'', toNativeTopicPartitionList) import Kafka.Consumer.Types (ConsumerGroupId (..), Offset (..), PartitionOffset (..), TopicPartition (..)) import Kafka.Internal.RdKafka (RdKafkaGroupInfoT (..), RdKafkaGroupListT (..), RdKafkaGroupListTPtr, RdKafkaGroupMemberInfoT (..), RdKafkaMetadataBrokerT (..), RdKafkaMetadataPartitionT (..), RdKafkaMetadataT (..), RdKafkaMetadataTPtr, RdKafkaMetadataTopicT (..), RdKafkaRespErrT (..), RdKafkaTPtr, destroyUnmanagedRdKafkaTopic, newUnmanagedRdKafkaTopicT, peekCAText, rdKafkaListGroups, rdKafkaMetadata, rdKafkaOffsetsForTimes, rdKafkaQueryWatermarkOffsets) import Kafka.Internal.Setup (HasKafka (..), Kafka (..)) import Kafka.Internal.Shared (kafkaErrorToMaybe) import Kafka.Types (BrokerId (..), ClientId (..), KafkaError (..), Millis (..), PartitionId (..), Timeout (..), TopicName (..)) import qualified Data.Set as S import qualified Data.Text as Text data KafkaMetadata = KafkaMetadata { kmBrokers :: [BrokerMetadata] , kmTopics :: [TopicMetadata] , kmOrigBroker :: !BrokerId } deriving (Show, Eq, Generic) data BrokerMetadata = BrokerMetadata { bmBrokerId :: !BrokerId , bmBrokerHost :: !Text , bmBrokerPort :: !Int } deriving (Show, Eq, Generic) data PartitionMetadata = PartitionMetadata { pmPartitionId :: !PartitionId , pmError :: Maybe KafkaError , pmLeader :: !BrokerId , pmReplicas :: [BrokerId] , pmInSyncReplicas :: [BrokerId] } deriving (Show, Eq, Generic) data TopicMetadata = TopicMetadata { tmTopicName :: !TopicName , tmPartitions :: [PartitionMetadata] , tmError :: Maybe KafkaError } deriving (Show, Eq, Generic) data WatermarkOffsets = WatermarkOffsets { woTopicName :: !TopicName , woPartitionId :: !PartitionId , woLowWatermark :: !Offset , woHighWatermark :: !Offset } deriving (Show, Eq, Generic) newtype GroupMemberId = GroupMemberId Text deriving (Show, Eq, Read, Ord) data GroupMemberInfo = GroupMemberInfo { gmiMemberId :: !GroupMemberId , gmiClientId :: !ClientId , gmiClientHost :: !Text , gmiMetadata :: !ByteString , gmiAssignment :: !ByteString } deriving (Show, Eq, Generic) newtype GroupProtocolType = GroupProtocolType Text deriving (Show, Eq, Read, Ord, Generic) newtype GroupProtocol = GroupProtocol Text deriving (Show, Eq, Read, Ord, Generic) data GroupState = GroupPreparingRebalance -- ^ Group is preparing to rebalance | GroupEmpty -- ^ Group has no more members, but lingers until all offsets have expired | GroupAwaitingSync -- ^ Group is awaiting state assignment from the leader | GroupStable -- ^ Group is stable | GroupDead -- ^ Group has no more members and its metadata is being removed deriving (Show, Eq, Read, Ord, Generic) data GroupInfo = GroupInfo { giGroup :: !ConsumerGroupId , giError :: Maybe KafkaError , giState :: !GroupState , giProtocolType :: !GroupProtocolType , giProtocol :: !GroupProtocol , giMembers :: [GroupMemberInfo] } deriving (Show, Eq, Generic) -- | Returns metadata for all topics in the cluster allTopicsMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError KafkaMetadata) allTopicsMetadata k (Timeout timeout) = liftIO $ do meta <- rdKafkaMetadata (getKafkaPtr k) True Nothing timeout traverse fromKafkaMetadataPtr (left KafkaResponseError meta) -- | Returns metadata only for specified topic topicMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata) topicMetadata k (Timeout timeout) (TopicName tn) = liftIO $ bracket mkTopic clTopic $ \mbt -> case mbt of Left err -> return (Left $ KafkaError (Text.pack err)) Right t -> do meta <- rdKafkaMetadata (getKafkaPtr k) False (Just t) timeout traverse fromKafkaMetadataPtr (left KafkaResponseError meta) where mkTopic = newUnmanagedRdKafkaTopicT (getKafkaPtr k) (Text.unpack tn) Nothing clTopic = either (return . const ()) destroyUnmanagedRdKafkaTopic -- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a given topic. watermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m [Either KafkaError WatermarkOffsets] watermarkOffsets k timeout t = do meta <- topicMetadata k timeout t case meta of Left err -> return [Left err] Right tm -> if null (kmTopics tm) then return [] else watermarkOffsets' k timeout (head $ kmTopics tm) -- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a given topic. watermarkOffsets' :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicMetadata -> m [Either KafkaError WatermarkOffsets] watermarkOffsets' k timeout tm = let pids = pmPartitionId <$> tmPartitions tm in liftIO $ traverse (partitionWatermarkOffsets k timeout (tmTopicName tm)) pids -- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a specific partition partitionWatermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> PartitionId -> m (Either KafkaError WatermarkOffsets) partitionWatermarkOffsets k (Timeout timeout) (TopicName t) (PartitionId p) = liftIO $ do offs <- rdKafkaQueryWatermarkOffsets (getKafkaPtr k) (Text.unpack t) p timeout return $ bimap KafkaResponseError toWatermark offs where toWatermark (l, h) = WatermarkOffsets (TopicName t) (PartitionId p) (Offset l) (Offset h) -- | Look up the offsets for the given topic by timestamp. -- -- The returned offset for each partition is the earliest offset whose -- timestamp is greater than or equal to the given timestamp in the -- corresponding partition. topicOffsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicName -> m (Either KafkaError [TopicPartition]) topicOffsetsForTime k timeout timestamp topic = do meta <- topicMetadata k timeout topic case meta of Left err -> return $ Left err Right meta' -> let tps = [(tmTopicName t, pmPartitionId p)| t <- kmTopics meta', p <- tmPartitions t] in offsetsForTime k timeout timestamp tps -- | Look up the offsets for the given metadata by timestamp. -- -- The returned offset for each partition is the earliest offset whose -- timestamp is greater than or equal to the given timestamp in the -- corresponding partition. offsetsForTime' :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicMetadata -> m (Either KafkaError [TopicPartition]) offsetsForTime' k timeout timestamp t = let tps = [(tmTopicName t, pmPartitionId p) | p <- tmPartitions t] in offsetsForTime k timeout timestamp tps -- | Look up the offsets for the given partitions by timestamp. -- -- The returned offset for each partition is the earliest offset whose -- timestamp is greater than or equal to the given timestamp in the -- corresponding partition. offsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) offsetsForTime k (Timeout timeout) (Millis t) tps = liftIO $ do ntps <- toNativeTopicPartitionList $ mkTopicPartition <$> uniqueTps res <- rdKafkaOffsetsForTimes (getKafkaPtr k) ntps timeout case res of RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps err -> return $ Left (KafkaResponseError err) where uniqueTps = S.toList . S.fromList $ tps -- rd_kafka_offsets_for_times reuses `offset` to specify timestamp :( mkTopicPartition (tn, p) = TopicPartition tn p (PartitionOffset t) -- | List and describe all consumer groups in cluster. allConsumerGroupsInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError [GroupInfo]) allConsumerGroupsInfo k (Timeout t) = liftIO $ do res <- rdKafkaListGroups (getKafkaPtr k) Nothing t traverse fromGroupInfoListPtr (left KafkaResponseError res) -- | Describe a given consumer group. consumerGroupInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> ConsumerGroupId -> m (Either KafkaError [GroupInfo]) consumerGroupInfo k (Timeout timeout) (ConsumerGroupId gn) = liftIO $ do res <- rdKafkaListGroups (getKafkaPtr k) (Just (Text.unpack gn)) timeout traverse fromGroupInfoListPtr (left KafkaResponseError res) ------------------------------------------------------------------------------- getKafkaPtr :: HasKafka k => k -> RdKafkaTPtr getKafkaPtr k = let (Kafka k') = getKafka k in k' {-# INLINE getKafkaPtr #-} fromGroupInfoListPtr :: RdKafkaGroupListTPtr -> IO [GroupInfo] fromGroupInfoListPtr ptr = withForeignPtr ptr $ \realPtr -> do gl <- peek realPtr pgis <- peekArray (groupCnt'RdKafkaGroupListT gl) (groups'RdKafkaGroupListT gl) traverse fromGroupInfoPtr pgis fromGroupInfoPtr :: RdKafkaGroupInfoT -> IO GroupInfo fromGroupInfoPtr gi = do --bmd <- peek (broker'RdKafkaGroupInfoT gi) -- >>= fromBrokerMetadataPtr --xxx <- fromBrokerMetadataPtr bmd cid <- peekCAText $ group'RdKafkaGroupInfoT gi stt <- peekCAText $ state'RdKafkaGroupInfoT gi prt <- peekCAText $ protocolType'RdKafkaGroupInfoT gi pr <- peekCAText $ protocol'RdKafkaGroupInfoT gi mbs <- peekArray (memberCnt'RdKafkaGroupInfoT gi) (members'RdKafkaGroupInfoT gi) mbl <- mapM fromGroupMemberInfoPtr mbs return GroupInfo { --giBroker = bmd giGroup = ConsumerGroupId cid , giError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaGroupInfoT gi) , giState = groupStateFromKafkaString stt , giProtocolType = GroupProtocolType prt , giProtocol = GroupProtocol pr , giMembers = mbl } fromGroupMemberInfoPtr :: RdKafkaGroupMemberInfoT -> IO GroupMemberInfo fromGroupMemberInfoPtr mi = do mid <- peekCAText $ memberId'RdKafkaGroupMemberInfoT mi cid <- peekCAText $ clientId'RdKafkaGroupMemberInfoT mi hst <- peekCAText $ clientHost'RdKafkaGroupMemberInfoT mi mtd <- peekArray (memberMetadataSize'RdKafkaGroupMemberInfoT mi) (memberMetadata'RdKafkaGroupMemberInfoT mi) ass <- peekArray (memberAssignmentSize'RdKafkaGroupMemberInfoT mi) (memberAssignment'RdKafkaGroupMemberInfoT mi) return GroupMemberInfo { gmiMemberId = GroupMemberId mid , gmiClientId = ClientId cid , gmiClientHost = hst , gmiMetadata = pack mtd , gmiAssignment = pack ass } fromTopicMetadataPtr :: RdKafkaMetadataTopicT -> IO TopicMetadata fromTopicMetadataPtr tm = do tnm <- peekCAText (topic'RdKafkaMetadataTopicT tm) pts <- peekArray (partitionCnt'RdKafkaMetadataTopicT tm) (partitions'RdKafkaMetadataTopicT tm) pms <- mapM fromPartitionMetadataPtr pts return TopicMetadata { tmTopicName = TopicName tnm , tmPartitions = pms , tmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataTopicT tm) } fromPartitionMetadataPtr :: RdKafkaMetadataPartitionT -> IO PartitionMetadata fromPartitionMetadataPtr pm = do reps <- peekArray (replicaCnt'RdKafkaMetadataPartitionT pm) (replicas'RdKafkaMetadataPartitionT pm) isrs <- peekArray (isrCnt'RdKafkaMetadataPartitionT pm) (isrs'RdKafkaMetadataPartitionT pm) return PartitionMetadata { pmPartitionId = PartitionId (id'RdKafkaMetadataPartitionT pm) , pmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataPartitionT pm) , pmLeader = BrokerId (leader'RdKafkaMetadataPartitionT pm) , pmReplicas = (BrokerId . fromIntegral) <$> reps , pmInSyncReplicas = (BrokerId . fromIntegral) <$> isrs } fromBrokerMetadataPtr :: RdKafkaMetadataBrokerT -> IO BrokerMetadata fromBrokerMetadataPtr bm = do host <- peekCAText (host'RdKafkaMetadataBrokerT bm) return BrokerMetadata { bmBrokerId = BrokerId (id'RdKafkaMetadataBrokerT bm) , bmBrokerHost = host , bmBrokerPort = port'RdKafkaMetadataBrokerT bm } fromKafkaMetadataPtr :: RdKafkaMetadataTPtr -> IO KafkaMetadata fromKafkaMetadataPtr ptr = withForeignPtr ptr $ \realPtr -> do km <- peek realPtr pbms <- peekArray (brokerCnt'RdKafkaMetadataT km) (brokers'RdKafkaMetadataT km) bms <- mapM fromBrokerMetadataPtr pbms ptms <- peekArray (topicCnt'RdKafkaMetadataT km) (topics'RdKafkaMetadataT km) tms <- mapM fromTopicMetadataPtr ptms return KafkaMetadata { kmBrokers = bms , kmTopics = tms , kmOrigBroker = BrokerId $ fromIntegral (origBrokerId'RdKafkaMetadataT km) } groupStateFromKafkaString :: Text -> GroupState groupStateFromKafkaString s = case s of "PreparingRebalance" -> GroupPreparingRebalance "AwaitingSync" -> GroupAwaitingSync "Stable" -> GroupStable "Dead" -> GroupDead "Empty" -> GroupEmpty _ -> error $ "Unknown group state: " <> (Text.unpack s)