module Kafka.Metadata
( KafkaMetadata(..), BrokerMetadata(..), TopicMetadata(..), PartitionMetadata(..)
, WatermarkOffsets(..)
, GroupMemberId(..), GroupMemberInfo(..)
, GroupProtocolType(..), GroupProtocol(..), GroupState(..)
, GroupInfo(..)
, allTopicsMetadata, topicMetadata
, watermarkOffsets, watermarkOffsets'
, partitionWatermarkOffsets
, offsetsForTime, offsetsForTime', topicOffsetsForTime
, allConsumerGroupsInfo, consumerGroupInfo
)
where
import Control.Arrow (left)
import Control.Exception (bracket)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Bifunctor
import Data.ByteString (ByteString, pack)
import Data.Monoid ((<>))
import qualified Data.Set as S
import Foreign
import Foreign.C.String
import Kafka.Consumer.Convert
import Kafka.Consumer.Types
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Internal.Shared
import Kafka.Types
data KafkaMetadata = KafkaMetadata
{ kmBrokers :: [BrokerMetadata]
, kmTopics :: [TopicMetadata]
, kmOrigBroker :: !BrokerId
} deriving (Show, Eq)
data BrokerMetadata = BrokerMetadata
{ bmBrokerId :: !BrokerId
, bmBrokerHost :: !String
, bmBrokerPort :: !Int
} deriving (Show, Eq)
data PartitionMetadata = PartitionMetadata
{ pmPartitionId :: !PartitionId
, pmError :: Maybe KafkaError
, pmLeader :: !BrokerId
, pmReplicas :: [BrokerId]
, pmInSyncReplicas :: [BrokerId]
} deriving (Show, Eq)
data TopicMetadata = TopicMetadata
{ tmTopicName :: !TopicName
, tmPartitions :: [PartitionMetadata]
, tmError :: Maybe KafkaError
} deriving (Show, Eq)
data WatermarkOffsets = WatermarkOffsets
{ woTopicName :: !TopicName
, woPartitionId :: !PartitionId
, woLowWatermark :: !Offset
, woHighWatermark :: !Offset
} deriving (Show, Eq)
newtype GroupMemberId = GroupMemberId String deriving (Show, Eq, Read, Ord)
data GroupMemberInfo = GroupMemberInfo
{ gmiMemberId :: !GroupMemberId
, gmiClientId :: !ClientId
, gmiClientHost :: !String
, gmiMetadata :: !ByteString
, gmiAssignment :: !ByteString
} deriving (Show, Eq)
newtype GroupProtocolType = GroupProtocolType String deriving (Show, Eq, Read, Ord)
newtype GroupProtocol = GroupProtocol String deriving (Show, Eq, Read, Ord)
data GroupState
= GroupPreparingRebalance
| GroupEmpty
| GroupAwaitingSync
| GroupStable
| GroupDead
deriving (Show, Eq, Read, Ord)
data GroupInfo = GroupInfo
{ giGroup :: !ConsumerGroupId
, giError :: Maybe KafkaError
, giState :: !GroupState
, giProtocolType :: !GroupProtocolType
, giProtocol :: !GroupProtocol
, giMembers :: [GroupMemberInfo]
} deriving (Show, Eq)
allTopicsMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError KafkaMetadata)
allTopicsMetadata k (Timeout timeout) = liftIO $ do
meta <- rdKafkaMetadata (getKafkaPtr k) True Nothing timeout
traverse fromKafkaMetadataPtr (left KafkaResponseError meta)
topicMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
topicMetadata k (Timeout timeout) (TopicName tn) = liftIO $
bracket mkTopic clTopic $ \mbt -> case mbt of
Left err -> return (Left $ KafkaError err)
Right t -> do
meta <- rdKafkaMetadata (getKafkaPtr k) False (Just t) timeout
traverse fromKafkaMetadataPtr (left KafkaResponseError meta)
where
mkTopic = newUnmanagedRdKafkaTopicT (getKafkaPtr k) tn Nothing
clTopic = either (return . const ()) destroyUnmanagedRdKafkaTopic
watermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets k timeout t = do
meta <- topicMetadata k timeout t
case meta of
Left err -> return [Left err]
Right tm -> if null (kmTopics tm)
then return []
else watermarkOffsets' k timeout (head $ kmTopics tm)
watermarkOffsets' :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicMetadata -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets' k timeout tm =
let pids = pmPartitionId <$> tmPartitions tm
in liftIO $ traverse (partitionWatermarkOffsets k timeout (tmTopicName tm)) pids
partitionWatermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> PartitionId -> m (Either KafkaError WatermarkOffsets)
partitionWatermarkOffsets k (Timeout timeout) (TopicName t) (PartitionId p) = liftIO $ do
offs <- rdKafkaQueryWatermarkOffsets (getKafkaPtr k) t p timeout
return $ bimap KafkaResponseError toWatermark offs
where
toWatermark (l, h) = WatermarkOffsets (TopicName t) (PartitionId p) (Offset l) (Offset h)
topicOffsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicName -> m (Either KafkaError [TopicPartition])
topicOffsetsForTime k timeout timestamp topic = do
meta <- topicMetadata k timeout topic
case meta of
Left err -> return $ Left err
Right meta' ->
let tps = [(tmTopicName t, pmPartitionId p)| t <- kmTopics meta', p <- tmPartitions t]
in offsetsForTime k timeout timestamp tps
offsetsForTime' :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicMetadata -> m (Either KafkaError [TopicPartition])
offsetsForTime' k timeout timestamp t =
let tps = [(tmTopicName t, pmPartitionId p) | p <- tmPartitions t]
in offsetsForTime k timeout timestamp tps
offsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
offsetsForTime k (Timeout timeout) (Millis t) tps = liftIO $ do
ntps <- toNativeTopicPartitionList $ mkTopicPartition <$> uniqueTps
res <- rdKafkaOffsetsForTimes (getKafkaPtr k) ntps timeout
case res of
RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps
err -> return $ Left (KafkaResponseError err)
where
uniqueTps = S.toList . S.fromList $ tps
mkTopicPartition (tn, p) = TopicPartition tn p (PartitionOffset t)
allConsumerGroupsInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError [GroupInfo])
allConsumerGroupsInfo k (Timeout t) = liftIO $ do
res <- rdKafkaListGroups (getKafkaPtr k) Nothing t
traverse fromGroupInfoListPtr (left KafkaResponseError res)
consumerGroupInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> ConsumerGroupId -> m (Either KafkaError [GroupInfo])
consumerGroupInfo k (Timeout timeout) (ConsumerGroupId gn) = liftIO $ do
res <- rdKafkaListGroups (getKafkaPtr k) (Just gn) timeout
traverse fromGroupInfoListPtr (left KafkaResponseError res)
getKafkaPtr :: HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k = let (Kafka k') = getKafka k in k'
{-# INLINE getKafkaPtr #-}
fromGroupInfoListPtr :: RdKafkaGroupListTPtr -> IO [GroupInfo]
fromGroupInfoListPtr ptr =
withForeignPtr ptr $ \realPtr -> do
gl <- peek realPtr
pgis <- peekArray (groupCnt'RdKafkaGroupListT gl) (groups'RdKafkaGroupListT gl)
traverse fromGroupInfoPtr pgis
fromGroupInfoPtr :: RdKafkaGroupInfoT -> IO GroupInfo
fromGroupInfoPtr gi = do
cid <- peekCAString $ group'RdKafkaGroupInfoT gi
stt <- peekCAString $ state'RdKafkaGroupInfoT gi
prt <- peekCAString $ protocolType'RdKafkaGroupInfoT gi
pr <- peekCAString $ protocol'RdKafkaGroupInfoT gi
mbs <- peekArray (memberCnt'RdKafkaGroupInfoT gi) (members'RdKafkaGroupInfoT gi)
mbl <- mapM fromGroupMemberInfoPtr mbs
return GroupInfo
{
giGroup = ConsumerGroupId cid
, giError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaGroupInfoT gi)
, giState = groupStateFromKafkaString stt
, giProtocolType = GroupProtocolType prt
, giProtocol = GroupProtocol pr
, giMembers = mbl
}
fromGroupMemberInfoPtr :: RdKafkaGroupMemberInfoT -> IO GroupMemberInfo
fromGroupMemberInfoPtr mi = do
mid <- peekCAString $ memberId'RdKafkaGroupMemberInfoT mi
cid <- peekCAString $ clientId'RdKafkaGroupMemberInfoT mi
hst <- peekCAString $ clientHost'RdKafkaGroupMemberInfoT mi
mtd <- peekArray (memberMetadataSize'RdKafkaGroupMemberInfoT mi) (memberMetadata'RdKafkaGroupMemberInfoT mi)
ass <- peekArray (memberAssignmentSize'RdKafkaGroupMemberInfoT mi) (memberAssignment'RdKafkaGroupMemberInfoT mi)
return GroupMemberInfo
{ gmiMemberId = GroupMemberId mid
, gmiClientId = ClientId cid
, gmiClientHost = hst
, gmiMetadata = pack mtd
, gmiAssignment = pack ass
}
fromTopicMetadataPtr :: RdKafkaMetadataTopicT -> IO TopicMetadata
fromTopicMetadataPtr tm = do
tnm <- peekCAString (topic'RdKafkaMetadataTopicT tm)
pts <- peekArray (partitionCnt'RdKafkaMetadataTopicT tm) (partitions'RdKafkaMetadataTopicT tm)
pms <- mapM fromPartitionMetadataPtr pts
return TopicMetadata
{ tmTopicName = TopicName tnm
, tmPartitions = pms
, tmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataTopicT tm)
}
fromPartitionMetadataPtr :: RdKafkaMetadataPartitionT -> IO PartitionMetadata
fromPartitionMetadataPtr pm = do
reps <- peekArray (replicaCnt'RdKafkaMetadataPartitionT pm) (replicas'RdKafkaMetadataPartitionT pm)
isrs <- peekArray (isrCnt'RdKafkaMetadataPartitionT pm) (isrs'RdKafkaMetadataPartitionT pm)
return PartitionMetadata
{ pmPartitionId = PartitionId (id'RdKafkaMetadataPartitionT pm)
, pmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataPartitionT pm)
, pmLeader = BrokerId (leader'RdKafkaMetadataPartitionT pm)
, pmReplicas = (BrokerId . fromIntegral) <$> reps
, pmInSyncReplicas = (BrokerId . fromIntegral) <$> isrs
}
fromBrokerMetadataPtr :: RdKafkaMetadataBrokerT -> IO BrokerMetadata
fromBrokerMetadataPtr bm = do
host <- peekCAString (host'RdKafkaMetadataBrokerT bm)
return BrokerMetadata
{ bmBrokerId = BrokerId (id'RdKafkaMetadataBrokerT bm)
, bmBrokerHost = host
, bmBrokerPort = port'RdKafkaMetadataBrokerT bm
}
fromKafkaMetadataPtr :: RdKafkaMetadataTPtr -> IO KafkaMetadata
fromKafkaMetadataPtr ptr =
withForeignPtr ptr $ \realPtr -> do
km <- peek realPtr
pbms <- peekArray (brokerCnt'RdKafkaMetadataT km) (brokers'RdKafkaMetadataT km)
bms <- mapM fromBrokerMetadataPtr pbms
ptms <- peekArray (topicCnt'RdKafkaMetadataT km) (topics'RdKafkaMetadataT km)
tms <- mapM fromTopicMetadataPtr ptms
return KafkaMetadata
{ kmBrokers = bms
, kmTopics = tms
, kmOrigBroker = BrokerId $ fromIntegral (origBrokerId'RdKafkaMetadataT km)
}
groupStateFromKafkaString :: String -> GroupState
groupStateFromKafkaString s = case s of
"PreparingRebalance" -> GroupPreparingRebalance
"AwaitingSync" -> GroupAwaitingSync
"Stable" -> GroupStable
"Dead" -> GroupDead
"Empty" -> GroupEmpty
_ -> error $ "Unknown group state: " <> s