{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE OverloadedStrings #-}

-----------------------------------------------------------------------------
-- |
-- Module with metadata types and functions.
-----------------------------------------------------------------------------
module Kafka.Metadata
( KafkaMetadata(..), BrokerMetadata(..), TopicMetadata(..), PartitionMetadata(..)
, WatermarkOffsets(..)
, GroupMemberId(..), GroupMemberInfo(..)
, GroupProtocolType(..), GroupProtocol(..), GroupState(..)
, GroupInfo(..)
, allTopicsMetadata, topicMetadata
, watermarkOffsets, watermarkOffsets'
, partitionWatermarkOffsets
, offsetsForTime, offsetsForTime', topicOffsetsForTime
, allConsumerGroupsInfo, consumerGroupInfo
)
where

import Control.Arrow          (left)
import Control.Exception      (bracket)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Bifunctor         (bimap)
import Data.ByteString        (ByteString, pack)
import Data.Text              (Text)
import Foreign                (Storable (peek), peekArray, withForeignPtr)
import GHC.Generics           (Generic)
import Kafka.Consumer.Convert (fromNativeTopicPartitionList'', toNativeTopicPartitionList)
import Kafka.Consumer.Types   (ConsumerGroupId (..), Offset (..), PartitionOffset (..), TopicPartition (..))
import Kafka.Internal.RdKafka (RdKafkaGroupInfoT (..), RdKafkaGroupListT (..), RdKafkaGroupListTPtr, RdKafkaGroupMemberInfoT (..), RdKafkaMetadataBrokerT (..), RdKafkaMetadataPartitionT (..), RdKafkaMetadataT (..), RdKafkaMetadataTPtr, RdKafkaMetadataTopicT (..), RdKafkaRespErrT (..), RdKafkaTPtr, destroyUnmanagedRdKafkaTopic, newUnmanagedRdKafkaTopicT, peekCAText, rdKafkaListGroups, rdKafkaMetadata, rdKafkaOffsetsForTimes, rdKafkaQueryWatermarkOffsets)
import Kafka.Internal.Setup   (HasKafka (..), Kafka (..))
import Kafka.Internal.Shared  (kafkaErrorToMaybe)
import Kafka.Types            (BrokerId (..), ClientId (..), KafkaError (..), Millis (..), PartitionId (..), Timeout (..), TopicName (..))

import qualified Data.Set  as S
import qualified Data.Text as Text

data KafkaMetadata = KafkaMetadata
  { KafkaMetadata -> [BrokerMetadata]
kmBrokers    :: [BrokerMetadata]
  , KafkaMetadata -> [TopicMetadata]
kmTopics     :: [TopicMetadata]
  , KafkaMetadata -> BrokerId
kmOrigBroker :: !BrokerId
  } deriving (Int -> KafkaMetadata -> ShowS
[KafkaMetadata] -> ShowS
KafkaMetadata -> String
(Int -> KafkaMetadata -> ShowS)
-> (KafkaMetadata -> String)
-> ([KafkaMetadata] -> ShowS)
-> Show KafkaMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaMetadata] -> ShowS
$cshowList :: [KafkaMetadata] -> ShowS
show :: KafkaMetadata -> String
$cshow :: KafkaMetadata -> String
showsPrec :: Int -> KafkaMetadata -> ShowS
$cshowsPrec :: Int -> KafkaMetadata -> ShowS
Show, KafkaMetadata -> KafkaMetadata -> Bool
(KafkaMetadata -> KafkaMetadata -> Bool)
-> (KafkaMetadata -> KafkaMetadata -> Bool) -> Eq KafkaMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: KafkaMetadata -> KafkaMetadata -> Bool
$c/= :: KafkaMetadata -> KafkaMetadata -> Bool
== :: KafkaMetadata -> KafkaMetadata -> Bool
$c== :: KafkaMetadata -> KafkaMetadata -> Bool
Eq, (forall x. KafkaMetadata -> Rep KafkaMetadata x)
-> (forall x. Rep KafkaMetadata x -> KafkaMetadata)
-> Generic KafkaMetadata
forall x. Rep KafkaMetadata x -> KafkaMetadata
forall x. KafkaMetadata -> Rep KafkaMetadata x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep KafkaMetadata x -> KafkaMetadata
$cfrom :: forall x. KafkaMetadata -> Rep KafkaMetadata x
Generic)

data BrokerMetadata = BrokerMetadata
  { BrokerMetadata -> BrokerId
bmBrokerId   :: !BrokerId
  , BrokerMetadata -> Text
bmBrokerHost :: !Text
  , BrokerMetadata -> Int
bmBrokerPort :: !Int
  } deriving (Int -> BrokerMetadata -> ShowS
[BrokerMetadata] -> ShowS
BrokerMetadata -> String
(Int -> BrokerMetadata -> ShowS)
-> (BrokerMetadata -> String)
-> ([BrokerMetadata] -> ShowS)
-> Show BrokerMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BrokerMetadata] -> ShowS
$cshowList :: [BrokerMetadata] -> ShowS
show :: BrokerMetadata -> String
$cshow :: BrokerMetadata -> String
showsPrec :: Int -> BrokerMetadata -> ShowS
$cshowsPrec :: Int -> BrokerMetadata -> ShowS
Show, BrokerMetadata -> BrokerMetadata -> Bool
(BrokerMetadata -> BrokerMetadata -> Bool)
-> (BrokerMetadata -> BrokerMetadata -> Bool) -> Eq BrokerMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BrokerMetadata -> BrokerMetadata -> Bool
$c/= :: BrokerMetadata -> BrokerMetadata -> Bool
== :: BrokerMetadata -> BrokerMetadata -> Bool
$c== :: BrokerMetadata -> BrokerMetadata -> Bool
Eq, (forall x. BrokerMetadata -> Rep BrokerMetadata x)
-> (forall x. Rep BrokerMetadata x -> BrokerMetadata)
-> Generic BrokerMetadata
forall x. Rep BrokerMetadata x -> BrokerMetadata
forall x. BrokerMetadata -> Rep BrokerMetadata x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep BrokerMetadata x -> BrokerMetadata
$cfrom :: forall x. BrokerMetadata -> Rep BrokerMetadata x
Generic)

data PartitionMetadata = PartitionMetadata
  { PartitionMetadata -> PartitionId
pmPartitionId    :: !PartitionId
  , PartitionMetadata -> Maybe KafkaError
pmError          :: Maybe KafkaError
  , PartitionMetadata -> BrokerId
pmLeader         :: !BrokerId
  , PartitionMetadata -> [BrokerId]
pmReplicas       :: [BrokerId]
  , PartitionMetadata -> [BrokerId]
pmInSyncReplicas :: [BrokerId]
  } deriving (Int -> PartitionMetadata -> ShowS
[PartitionMetadata] -> ShowS
PartitionMetadata -> String
(Int -> PartitionMetadata -> ShowS)
-> (PartitionMetadata -> String)
-> ([PartitionMetadata] -> ShowS)
-> Show PartitionMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PartitionMetadata] -> ShowS
$cshowList :: [PartitionMetadata] -> ShowS
show :: PartitionMetadata -> String
$cshow :: PartitionMetadata -> String
showsPrec :: Int -> PartitionMetadata -> ShowS
$cshowsPrec :: Int -> PartitionMetadata -> ShowS
Show, PartitionMetadata -> PartitionMetadata -> Bool
(PartitionMetadata -> PartitionMetadata -> Bool)
-> (PartitionMetadata -> PartitionMetadata -> Bool)
-> Eq PartitionMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PartitionMetadata -> PartitionMetadata -> Bool
$c/= :: PartitionMetadata -> PartitionMetadata -> Bool
== :: PartitionMetadata -> PartitionMetadata -> Bool
$c== :: PartitionMetadata -> PartitionMetadata -> Bool
Eq, (forall x. PartitionMetadata -> Rep PartitionMetadata x)
-> (forall x. Rep PartitionMetadata x -> PartitionMetadata)
-> Generic PartitionMetadata
forall x. Rep PartitionMetadata x -> PartitionMetadata
forall x. PartitionMetadata -> Rep PartitionMetadata x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PartitionMetadata x -> PartitionMetadata
$cfrom :: forall x. PartitionMetadata -> Rep PartitionMetadata x
Generic)

data TopicMetadata = TopicMetadata
  { TopicMetadata -> TopicName
tmTopicName  :: !TopicName
  , TopicMetadata -> [PartitionMetadata]
tmPartitions :: [PartitionMetadata]
  , TopicMetadata -> Maybe KafkaError
tmError      :: Maybe KafkaError
  } deriving (Int -> TopicMetadata -> ShowS
[TopicMetadata] -> ShowS
TopicMetadata -> String
(Int -> TopicMetadata -> ShowS)
-> (TopicMetadata -> String)
-> ([TopicMetadata] -> ShowS)
-> Show TopicMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TopicMetadata] -> ShowS
$cshowList :: [TopicMetadata] -> ShowS
show :: TopicMetadata -> String
$cshow :: TopicMetadata -> String
showsPrec :: Int -> TopicMetadata -> ShowS
$cshowsPrec :: Int -> TopicMetadata -> ShowS
Show, TopicMetadata -> TopicMetadata -> Bool
(TopicMetadata -> TopicMetadata -> Bool)
-> (TopicMetadata -> TopicMetadata -> Bool) -> Eq TopicMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: TopicMetadata -> TopicMetadata -> Bool
$c/= :: TopicMetadata -> TopicMetadata -> Bool
== :: TopicMetadata -> TopicMetadata -> Bool
$c== :: TopicMetadata -> TopicMetadata -> Bool
Eq, (forall x. TopicMetadata -> Rep TopicMetadata x)
-> (forall x. Rep TopicMetadata x -> TopicMetadata)
-> Generic TopicMetadata
forall x. Rep TopicMetadata x -> TopicMetadata
forall x. TopicMetadata -> Rep TopicMetadata x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep TopicMetadata x -> TopicMetadata
$cfrom :: forall x. TopicMetadata -> Rep TopicMetadata x
Generic)

data WatermarkOffsets = WatermarkOffsets
  { WatermarkOffsets -> TopicName
woTopicName     :: !TopicName
  , WatermarkOffsets -> PartitionId
woPartitionId   :: !PartitionId
  , WatermarkOffsets -> Offset
woLowWatermark  :: !Offset
  , WatermarkOffsets -> Offset
woHighWatermark :: !Offset
  } deriving (Int -> WatermarkOffsets -> ShowS
[WatermarkOffsets] -> ShowS
WatermarkOffsets -> String
(Int -> WatermarkOffsets -> ShowS)
-> (WatermarkOffsets -> String)
-> ([WatermarkOffsets] -> ShowS)
-> Show WatermarkOffsets
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WatermarkOffsets] -> ShowS
$cshowList :: [WatermarkOffsets] -> ShowS
show :: WatermarkOffsets -> String
$cshow :: WatermarkOffsets -> String
showsPrec :: Int -> WatermarkOffsets -> ShowS
$cshowsPrec :: Int -> WatermarkOffsets -> ShowS
Show, WatermarkOffsets -> WatermarkOffsets -> Bool
(WatermarkOffsets -> WatermarkOffsets -> Bool)
-> (WatermarkOffsets -> WatermarkOffsets -> Bool)
-> Eq WatermarkOffsets
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WatermarkOffsets -> WatermarkOffsets -> Bool
$c/= :: WatermarkOffsets -> WatermarkOffsets -> Bool
== :: WatermarkOffsets -> WatermarkOffsets -> Bool
$c== :: WatermarkOffsets -> WatermarkOffsets -> Bool
Eq, (forall x. WatermarkOffsets -> Rep WatermarkOffsets x)
-> (forall x. Rep WatermarkOffsets x -> WatermarkOffsets)
-> Generic WatermarkOffsets
forall x. Rep WatermarkOffsets x -> WatermarkOffsets
forall x. WatermarkOffsets -> Rep WatermarkOffsets x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep WatermarkOffsets x -> WatermarkOffsets
$cfrom :: forall x. WatermarkOffsets -> Rep WatermarkOffsets x
Generic)

newtype GroupMemberId = GroupMemberId Text deriving (Int -> GroupMemberId -> ShowS
[GroupMemberId] -> ShowS
GroupMemberId -> String
(Int -> GroupMemberId -> ShowS)
-> (GroupMemberId -> String)
-> ([GroupMemberId] -> ShowS)
-> Show GroupMemberId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GroupMemberId] -> ShowS
$cshowList :: [GroupMemberId] -> ShowS
show :: GroupMemberId -> String
$cshow :: GroupMemberId -> String
showsPrec :: Int -> GroupMemberId -> ShowS
$cshowsPrec :: Int -> GroupMemberId -> ShowS
Show, GroupMemberId -> GroupMemberId -> Bool
(GroupMemberId -> GroupMemberId -> Bool)
-> (GroupMemberId -> GroupMemberId -> Bool) -> Eq GroupMemberId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: GroupMemberId -> GroupMemberId -> Bool
$c/= :: GroupMemberId -> GroupMemberId -> Bool
== :: GroupMemberId -> GroupMemberId -> Bool
$c== :: GroupMemberId -> GroupMemberId -> Bool
Eq, ReadPrec [GroupMemberId]
ReadPrec GroupMemberId
Int -> ReadS GroupMemberId
ReadS [GroupMemberId]
(Int -> ReadS GroupMemberId)
-> ReadS [GroupMemberId]
-> ReadPrec GroupMemberId
-> ReadPrec [GroupMemberId]
-> Read GroupMemberId
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [GroupMemberId]
$creadListPrec :: ReadPrec [GroupMemberId]
readPrec :: ReadPrec GroupMemberId
$creadPrec :: ReadPrec GroupMemberId
readList :: ReadS [GroupMemberId]
$creadList :: ReadS [GroupMemberId]
readsPrec :: Int -> ReadS GroupMemberId
$creadsPrec :: Int -> ReadS GroupMemberId
Read, Eq GroupMemberId
Eq GroupMemberId =>
(GroupMemberId -> GroupMemberId -> Ordering)
-> (GroupMemberId -> GroupMemberId -> Bool)
-> (GroupMemberId -> GroupMemberId -> Bool)
-> (GroupMemberId -> GroupMemberId -> Bool)
-> (GroupMemberId -> GroupMemberId -> Bool)
-> (GroupMemberId -> GroupMemberId -> GroupMemberId)
-> (GroupMemberId -> GroupMemberId -> GroupMemberId)
-> Ord GroupMemberId
GroupMemberId -> GroupMemberId -> Bool
GroupMemberId -> GroupMemberId -> Ordering
GroupMemberId -> GroupMemberId -> GroupMemberId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: GroupMemberId -> GroupMemberId -> GroupMemberId
$cmin :: GroupMemberId -> GroupMemberId -> GroupMemberId
max :: GroupMemberId -> GroupMemberId -> GroupMemberId
$cmax :: GroupMemberId -> GroupMemberId -> GroupMemberId
>= :: GroupMemberId -> GroupMemberId -> Bool
$c>= :: GroupMemberId -> GroupMemberId -> Bool
> :: GroupMemberId -> GroupMemberId -> Bool
$c> :: GroupMemberId -> GroupMemberId -> Bool
<= :: GroupMemberId -> GroupMemberId -> Bool
$c<= :: GroupMemberId -> GroupMemberId -> Bool
< :: GroupMemberId -> GroupMemberId -> Bool
$c< :: GroupMemberId -> GroupMemberId -> Bool
compare :: GroupMemberId -> GroupMemberId -> Ordering
$ccompare :: GroupMemberId -> GroupMemberId -> Ordering
$cp1Ord :: Eq GroupMemberId
Ord)
data GroupMemberInfo = GroupMemberInfo
  { GroupMemberInfo -> GroupMemberId
gmiMemberId   :: !GroupMemberId
  , GroupMemberInfo -> ClientId
gmiClientId   :: !ClientId
  , GroupMemberInfo -> Text
gmiClientHost :: !Text
  , GroupMemberInfo -> ByteString
gmiMetadata   :: !ByteString
  , GroupMemberInfo -> ByteString
gmiAssignment :: !ByteString
  } deriving (Int -> GroupMemberInfo -> ShowS
[GroupMemberInfo] -> ShowS
GroupMemberInfo -> String
(Int -> GroupMemberInfo -> ShowS)
-> (GroupMemberInfo -> String)
-> ([GroupMemberInfo] -> ShowS)
-> Show GroupMemberInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GroupMemberInfo] -> ShowS
$cshowList :: [GroupMemberInfo] -> ShowS
show :: GroupMemberInfo -> String
$cshow :: GroupMemberInfo -> String
showsPrec :: Int -> GroupMemberInfo -> ShowS
$cshowsPrec :: Int -> GroupMemberInfo -> ShowS
Show, GroupMemberInfo -> GroupMemberInfo -> Bool
(GroupMemberInfo -> GroupMemberInfo -> Bool)
-> (GroupMemberInfo -> GroupMemberInfo -> Bool)
-> Eq GroupMemberInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: GroupMemberInfo -> GroupMemberInfo -> Bool
$c/= :: GroupMemberInfo -> GroupMemberInfo -> Bool
== :: GroupMemberInfo -> GroupMemberInfo -> Bool
$c== :: GroupMemberInfo -> GroupMemberInfo -> Bool
Eq, (forall x. GroupMemberInfo -> Rep GroupMemberInfo x)
-> (forall x. Rep GroupMemberInfo x -> GroupMemberInfo)
-> Generic GroupMemberInfo
forall x. Rep GroupMemberInfo x -> GroupMemberInfo
forall x. GroupMemberInfo -> Rep GroupMemberInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep GroupMemberInfo x -> GroupMemberInfo
$cfrom :: forall x. GroupMemberInfo -> Rep GroupMemberInfo x
Generic)

newtype GroupProtocolType = GroupProtocolType Text deriving (Int -> GroupProtocolType -> ShowS
[GroupProtocolType] -> ShowS
GroupProtocolType -> String
(Int -> GroupProtocolType -> ShowS)
-> (GroupProtocolType -> String)
-> ([GroupProtocolType] -> ShowS)
-> Show GroupProtocolType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GroupProtocolType] -> ShowS
$cshowList :: [GroupProtocolType] -> ShowS
show :: GroupProtocolType -> String
$cshow :: GroupProtocolType -> String
showsPrec :: Int -> GroupProtocolType -> ShowS
$cshowsPrec :: Int -> GroupProtocolType -> ShowS
Show, GroupProtocolType -> GroupProtocolType -> Bool
(GroupProtocolType -> GroupProtocolType -> Bool)
-> (GroupProtocolType -> GroupProtocolType -> Bool)
-> Eq GroupProtocolType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: GroupProtocolType -> GroupProtocolType -> Bool
$c/= :: GroupProtocolType -> GroupProtocolType -> Bool
== :: GroupProtocolType -> GroupProtocolType -> Bool
$c== :: GroupProtocolType -> GroupProtocolType -> Bool
Eq, ReadPrec [GroupProtocolType]
ReadPrec GroupProtocolType
Int -> ReadS GroupProtocolType
ReadS [GroupProtocolType]
(Int -> ReadS GroupProtocolType)
-> ReadS [GroupProtocolType]
-> ReadPrec GroupProtocolType
-> ReadPrec [GroupProtocolType]
-> Read GroupProtocolType
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [GroupProtocolType]
$creadListPrec :: ReadPrec [GroupProtocolType]
readPrec :: ReadPrec GroupProtocolType
$creadPrec :: ReadPrec GroupProtocolType
readList :: ReadS [GroupProtocolType]
$creadList :: ReadS [GroupProtocolType]
readsPrec :: Int -> ReadS GroupProtocolType
$creadsPrec :: Int -> ReadS GroupProtocolType
Read, Eq GroupProtocolType
Eq GroupProtocolType =>
(GroupProtocolType -> GroupProtocolType -> Ordering)
-> (GroupProtocolType -> GroupProtocolType -> Bool)
-> (GroupProtocolType -> GroupProtocolType -> Bool)
-> (GroupProtocolType -> GroupProtocolType -> Bool)
-> (GroupProtocolType -> GroupProtocolType -> Bool)
-> (GroupProtocolType -> GroupProtocolType -> GroupProtocolType)
-> (GroupProtocolType -> GroupProtocolType -> GroupProtocolType)
-> Ord GroupProtocolType
GroupProtocolType -> GroupProtocolType -> Bool
GroupProtocolType -> GroupProtocolType -> Ordering
GroupProtocolType -> GroupProtocolType -> GroupProtocolType
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: GroupProtocolType -> GroupProtocolType -> GroupProtocolType
$cmin :: GroupProtocolType -> GroupProtocolType -> GroupProtocolType
max :: GroupProtocolType -> GroupProtocolType -> GroupProtocolType
$cmax :: GroupProtocolType -> GroupProtocolType -> GroupProtocolType
>= :: GroupProtocolType -> GroupProtocolType -> Bool
$c>= :: GroupProtocolType -> GroupProtocolType -> Bool
> :: GroupProtocolType -> GroupProtocolType -> Bool
$c> :: GroupProtocolType -> GroupProtocolType -> Bool
<= :: GroupProtocolType -> GroupProtocolType -> Bool
$c<= :: GroupProtocolType -> GroupProtocolType -> Bool
< :: GroupProtocolType -> GroupProtocolType -> Bool
$c< :: GroupProtocolType -> GroupProtocolType -> Bool
compare :: GroupProtocolType -> GroupProtocolType -> Ordering
$ccompare :: GroupProtocolType -> GroupProtocolType -> Ordering
$cp1Ord :: Eq GroupProtocolType
Ord, (forall x. GroupProtocolType -> Rep GroupProtocolType x)
-> (forall x. Rep GroupProtocolType x -> GroupProtocolType)
-> Generic GroupProtocolType
forall x. Rep GroupProtocolType x -> GroupProtocolType
forall x. GroupProtocolType -> Rep GroupProtocolType x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep GroupProtocolType x -> GroupProtocolType
$cfrom :: forall x. GroupProtocolType -> Rep GroupProtocolType x
Generic)
newtype GroupProtocol = GroupProtocol Text  deriving (Int -> GroupProtocol -> ShowS
[GroupProtocol] -> ShowS
GroupProtocol -> String
(Int -> GroupProtocol -> ShowS)
-> (GroupProtocol -> String)
-> ([GroupProtocol] -> ShowS)
-> Show GroupProtocol
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GroupProtocol] -> ShowS
$cshowList :: [GroupProtocol] -> ShowS
show :: GroupProtocol -> String
$cshow :: GroupProtocol -> String
showsPrec :: Int -> GroupProtocol -> ShowS
$cshowsPrec :: Int -> GroupProtocol -> ShowS
Show, GroupProtocol -> GroupProtocol -> Bool
(GroupProtocol -> GroupProtocol -> Bool)
-> (GroupProtocol -> GroupProtocol -> Bool) -> Eq GroupProtocol
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: GroupProtocol -> GroupProtocol -> Bool
$c/= :: GroupProtocol -> GroupProtocol -> Bool
== :: GroupProtocol -> GroupProtocol -> Bool
$c== :: GroupProtocol -> GroupProtocol -> Bool
Eq, ReadPrec [GroupProtocol]
ReadPrec GroupProtocol
Int -> ReadS GroupProtocol
ReadS [GroupProtocol]
(Int -> ReadS GroupProtocol)
-> ReadS [GroupProtocol]
-> ReadPrec GroupProtocol
-> ReadPrec [GroupProtocol]
-> Read GroupProtocol
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [GroupProtocol]
$creadListPrec :: ReadPrec [GroupProtocol]
readPrec :: ReadPrec GroupProtocol
$creadPrec :: ReadPrec GroupProtocol
readList :: ReadS [GroupProtocol]
$creadList :: ReadS [GroupProtocol]
readsPrec :: Int -> ReadS GroupProtocol
$creadsPrec :: Int -> ReadS GroupProtocol
Read, Eq GroupProtocol
Eq GroupProtocol =>
(GroupProtocol -> GroupProtocol -> Ordering)
-> (GroupProtocol -> GroupProtocol -> Bool)
-> (GroupProtocol -> GroupProtocol -> Bool)
-> (GroupProtocol -> GroupProtocol -> Bool)
-> (GroupProtocol -> GroupProtocol -> Bool)
-> (GroupProtocol -> GroupProtocol -> GroupProtocol)
-> (GroupProtocol -> GroupProtocol -> GroupProtocol)
-> Ord GroupProtocol
GroupProtocol -> GroupProtocol -> Bool
GroupProtocol -> GroupProtocol -> Ordering
GroupProtocol -> GroupProtocol -> GroupProtocol
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: GroupProtocol -> GroupProtocol -> GroupProtocol
$cmin :: GroupProtocol -> GroupProtocol -> GroupProtocol
max :: GroupProtocol -> GroupProtocol -> GroupProtocol
$cmax :: GroupProtocol -> GroupProtocol -> GroupProtocol
>= :: GroupProtocol -> GroupProtocol -> Bool
$c>= :: GroupProtocol -> GroupProtocol -> Bool
> :: GroupProtocol -> GroupProtocol -> Bool
$c> :: GroupProtocol -> GroupProtocol -> Bool
<= :: GroupProtocol -> GroupProtocol -> Bool
$c<= :: GroupProtocol -> GroupProtocol -> Bool
< :: GroupProtocol -> GroupProtocol -> Bool
$c< :: GroupProtocol -> GroupProtocol -> Bool
compare :: GroupProtocol -> GroupProtocol -> Ordering
$ccompare :: GroupProtocol -> GroupProtocol -> Ordering
$cp1Ord :: Eq GroupProtocol
Ord, (forall x. GroupProtocol -> Rep GroupProtocol x)
-> (forall x. Rep GroupProtocol x -> GroupProtocol)
-> Generic GroupProtocol
forall x. Rep GroupProtocol x -> GroupProtocol
forall x. GroupProtocol -> Rep GroupProtocol x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep GroupProtocol x -> GroupProtocol
$cfrom :: forall x. GroupProtocol -> Rep GroupProtocol x
Generic)
data GroupState
  = GroupPreparingRebalance       -- ^ Group is preparing to rebalance
  | GroupEmpty                    -- ^ Group has no more members, but lingers until all offsets have expired
  | GroupAwaitingSync             -- ^ Group is awaiting state assignment from the leader
  | GroupStable                   -- ^ Group is stable
  | GroupDead                     -- ^ Group has no more members and its metadata is being removed
  deriving (Int -> GroupState -> ShowS
[GroupState] -> ShowS
GroupState -> String
(Int -> GroupState -> ShowS)
-> (GroupState -> String)
-> ([GroupState] -> ShowS)
-> Show GroupState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GroupState] -> ShowS
$cshowList :: [GroupState] -> ShowS
show :: GroupState -> String
$cshow :: GroupState -> String
showsPrec :: Int -> GroupState -> ShowS
$cshowsPrec :: Int -> GroupState -> ShowS
Show, GroupState -> GroupState -> Bool
(GroupState -> GroupState -> Bool)
-> (GroupState -> GroupState -> Bool) -> Eq GroupState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: GroupState -> GroupState -> Bool
$c/= :: GroupState -> GroupState -> Bool
== :: GroupState -> GroupState -> Bool
$c== :: GroupState -> GroupState -> Bool
Eq, ReadPrec [GroupState]
ReadPrec GroupState
Int -> ReadS GroupState
ReadS [GroupState]
(Int -> ReadS GroupState)
-> ReadS [GroupState]
-> ReadPrec GroupState
-> ReadPrec [GroupState]
-> Read GroupState
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [GroupState]
$creadListPrec :: ReadPrec [GroupState]
readPrec :: ReadPrec GroupState
$creadPrec :: ReadPrec GroupState
readList :: ReadS [GroupState]
$creadList :: ReadS [GroupState]
readsPrec :: Int -> ReadS GroupState
$creadsPrec :: Int -> ReadS GroupState
Read, Eq GroupState
Eq GroupState =>
(GroupState -> GroupState -> Ordering)
-> (GroupState -> GroupState -> Bool)
-> (GroupState -> GroupState -> Bool)
-> (GroupState -> GroupState -> Bool)
-> (GroupState -> GroupState -> Bool)
-> (GroupState -> GroupState -> GroupState)
-> (GroupState -> GroupState -> GroupState)
-> Ord GroupState
GroupState -> GroupState -> Bool
GroupState -> GroupState -> Ordering
GroupState -> GroupState -> GroupState
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: GroupState -> GroupState -> GroupState
$cmin :: GroupState -> GroupState -> GroupState
max :: GroupState -> GroupState -> GroupState
$cmax :: GroupState -> GroupState -> GroupState
>= :: GroupState -> GroupState -> Bool
$c>= :: GroupState -> GroupState -> Bool
> :: GroupState -> GroupState -> Bool
$c> :: GroupState -> GroupState -> Bool
<= :: GroupState -> GroupState -> Bool
$c<= :: GroupState -> GroupState -> Bool
< :: GroupState -> GroupState -> Bool
$c< :: GroupState -> GroupState -> Bool
compare :: GroupState -> GroupState -> Ordering
$ccompare :: GroupState -> GroupState -> Ordering
$cp1Ord :: Eq GroupState
Ord, (forall x. GroupState -> Rep GroupState x)
-> (forall x. Rep GroupState x -> GroupState) -> Generic GroupState
forall x. Rep GroupState x -> GroupState
forall x. GroupState -> Rep GroupState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep GroupState x -> GroupState
$cfrom :: forall x. GroupState -> Rep GroupState x
Generic)

data GroupInfo = GroupInfo
  { GroupInfo -> ConsumerGroupId
giGroup        :: !ConsumerGroupId
  , GroupInfo -> Maybe KafkaError
giError        :: Maybe KafkaError
  , GroupInfo -> GroupState
giState        :: !GroupState
  , GroupInfo -> GroupProtocolType
giProtocolType :: !GroupProtocolType
  , GroupInfo -> GroupProtocol
giProtocol     :: !GroupProtocol
  , GroupInfo -> [GroupMemberInfo]
giMembers      :: [GroupMemberInfo]
  } deriving (Int -> GroupInfo -> ShowS
[GroupInfo] -> ShowS
GroupInfo -> String
(Int -> GroupInfo -> ShowS)
-> (GroupInfo -> String)
-> ([GroupInfo] -> ShowS)
-> Show GroupInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GroupInfo] -> ShowS
$cshowList :: [GroupInfo] -> ShowS
show :: GroupInfo -> String
$cshow :: GroupInfo -> String
showsPrec :: Int -> GroupInfo -> ShowS
$cshowsPrec :: Int -> GroupInfo -> ShowS
Show, GroupInfo -> GroupInfo -> Bool
(GroupInfo -> GroupInfo -> Bool)
-> (GroupInfo -> GroupInfo -> Bool) -> Eq GroupInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: GroupInfo -> GroupInfo -> Bool
$c/= :: GroupInfo -> GroupInfo -> Bool
== :: GroupInfo -> GroupInfo -> Bool
$c== :: GroupInfo -> GroupInfo -> Bool
Eq, (forall x. GroupInfo -> Rep GroupInfo x)
-> (forall x. Rep GroupInfo x -> GroupInfo) -> Generic GroupInfo
forall x. Rep GroupInfo x -> GroupInfo
forall x. GroupInfo -> Rep GroupInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep GroupInfo x -> GroupInfo
$cfrom :: forall x. GroupInfo -> Rep GroupInfo x
Generic)

-- | Returns metadata for all topics in the cluster
allTopicsMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError KafkaMetadata)
allTopicsMetadata :: k -> Timeout -> m (Either KafkaError KafkaMetadata)
allTopicsMetadata k :: k
k (Timeout timeout :: Int
timeout) = IO (Either KafkaError KafkaMetadata)
-> m (Either KafkaError KafkaMetadata)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaMetadata)
 -> m (Either KafkaError KafkaMetadata))
-> IO (Either KafkaError KafkaMetadata)
-> m (Either KafkaError KafkaMetadata)
forall a b. (a -> b) -> a -> b
$ do
  Either RdKafkaRespErrT RdKafkaMetadataTPtr
meta <- RdKafkaTPtr
-> Bool
-> Maybe RdKafkaTopicTPtr
-> Int
-> IO (Either RdKafkaRespErrT RdKafkaMetadataTPtr)
rdKafkaMetadata (k -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k
k) Bool
True Maybe RdKafkaTopicTPtr
forall a. Maybe a
Nothing Int
timeout
  (RdKafkaMetadataTPtr -> IO KafkaMetadata)
-> Either KafkaError RdKafkaMetadataTPtr
-> IO (Either KafkaError KafkaMetadata)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaMetadataTPtr -> IO KafkaMetadata
fromKafkaMetadataPtr ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaMetadataTPtr
-> Either KafkaError RdKafkaMetadataTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaMetadataTPtr
meta)

-- | Returns metadata only for specified topic
topicMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
topicMetadata :: k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
topicMetadata k :: k
k (Timeout timeout :: Int
timeout) (TopicName tn :: Text
tn) = IO (Either KafkaError KafkaMetadata)
-> m (Either KafkaError KafkaMetadata)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaMetadata)
 -> m (Either KafkaError KafkaMetadata))
-> IO (Either KafkaError KafkaMetadata)
-> m (Either KafkaError KafkaMetadata)
forall a b. (a -> b) -> a -> b
$
  IO (Either String RdKafkaTopicTPtr)
-> (Either String RdKafkaTopicTPtr -> IO ())
-> (Either String RdKafkaTopicTPtr
    -> IO (Either KafkaError KafkaMetadata))
-> IO (Either KafkaError KafkaMetadata)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either String RdKafkaTopicTPtr)
mkTopic Either String RdKafkaTopicTPtr -> IO ()
forall b. Either b RdKafkaTopicTPtr -> IO ()
clTopic ((Either String RdKafkaTopicTPtr
  -> IO (Either KafkaError KafkaMetadata))
 -> IO (Either KafkaError KafkaMetadata))
-> (Either String RdKafkaTopicTPtr
    -> IO (Either KafkaError KafkaMetadata))
-> IO (Either KafkaError KafkaMetadata)
forall a b. (a -> b) -> a -> b
$ \mbt :: Either String RdKafkaTopicTPtr
mbt -> case Either String RdKafkaTopicTPtr
mbt of
    Left err :: String
err -> Either KafkaError KafkaMetadata
-> IO (Either KafkaError KafkaMetadata)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError KafkaMetadata
forall a b. a -> Either a b
Left (KafkaError -> Either KafkaError KafkaMetadata)
-> KafkaError -> Either KafkaError KafkaMetadata
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError (String -> Text
Text.pack String
err))
    Right t :: RdKafkaTopicTPtr
t -> do
      Either RdKafkaRespErrT RdKafkaMetadataTPtr
meta <- RdKafkaTPtr
-> Bool
-> Maybe RdKafkaTopicTPtr
-> Int
-> IO (Either RdKafkaRespErrT RdKafkaMetadataTPtr)
rdKafkaMetadata (k -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k
k) Bool
False (RdKafkaTopicTPtr -> Maybe RdKafkaTopicTPtr
forall a. a -> Maybe a
Just RdKafkaTopicTPtr
t) Int
timeout
      (RdKafkaMetadataTPtr -> IO KafkaMetadata)
-> Either KafkaError RdKafkaMetadataTPtr
-> IO (Either KafkaError KafkaMetadata)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaMetadataTPtr -> IO KafkaMetadata
fromKafkaMetadataPtr ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaMetadataTPtr
-> Either KafkaError RdKafkaMetadataTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaMetadataTPtr
meta)
  where
    mkTopic :: IO (Either String RdKafkaTopicTPtr)
mkTopic = RdKafkaTPtr
-> String
-> Maybe RdKafkaTopicConfTPtr
-> IO (Either String RdKafkaTopicTPtr)
newUnmanagedRdKafkaTopicT (k -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k
k) (Text -> String
Text.unpack Text
tn) Maybe RdKafkaTopicConfTPtr
forall a. Maybe a
Nothing
    clTopic :: Either b RdKafkaTopicTPtr -> IO ()
clTopic = (b -> IO ())
-> (RdKafkaTopicTPtr -> IO ())
-> Either b RdKafkaTopicTPtr
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> (b -> ()) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. () -> b -> ()
forall a b. a -> b -> a
const ()) RdKafkaTopicTPtr -> IO ()
destroyUnmanagedRdKafkaTopic

-- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a given topic.
watermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets :: k -> Timeout -> TopicName -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets k :: k
k timeout :: Timeout
timeout t :: TopicName
t = do
  Either KafkaError KafkaMetadata
meta <- k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
topicMetadata k
k Timeout
timeout TopicName
t
  case Either KafkaError KafkaMetadata
meta of
    Left err :: KafkaError
err -> [Either KafkaError WatermarkOffsets]
-> m [Either KafkaError WatermarkOffsets]
forall (m :: * -> *) a. Monad m => a -> m a
return [KafkaError -> Either KafkaError WatermarkOffsets
forall a b. a -> Either a b
Left KafkaError
err]
    Right tm :: KafkaMetadata
tm -> if [TopicMetadata] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (KafkaMetadata -> [TopicMetadata]
kmTopics KafkaMetadata
tm)
                  then [Either KafkaError WatermarkOffsets]
-> m [Either KafkaError WatermarkOffsets]
forall (m :: * -> *) a. Monad m => a -> m a
return []
                  else k
-> Timeout
-> TopicMetadata
-> m [Either KafkaError WatermarkOffsets]
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k
-> Timeout
-> TopicMetadata
-> m [Either KafkaError WatermarkOffsets]
watermarkOffsets' k
k Timeout
timeout ([TopicMetadata] -> TopicMetadata
forall a. [a] -> a
head ([TopicMetadata] -> TopicMetadata)
-> [TopicMetadata] -> TopicMetadata
forall a b. (a -> b) -> a -> b
$ KafkaMetadata -> [TopicMetadata]
kmTopics KafkaMetadata
tm)

-- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a given topic.
watermarkOffsets' :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicMetadata -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets' :: k
-> Timeout
-> TopicMetadata
-> m [Either KafkaError WatermarkOffsets]
watermarkOffsets' k :: k
k timeout :: Timeout
timeout tm :: TopicMetadata
tm =
  let pids :: [PartitionId]
pids = PartitionMetadata -> PartitionId
pmPartitionId (PartitionMetadata -> PartitionId)
-> [PartitionMetadata] -> [PartitionId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TopicMetadata -> [PartitionMetadata]
tmPartitions TopicMetadata
tm
  in IO [Either KafkaError WatermarkOffsets]
-> m [Either KafkaError WatermarkOffsets]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Either KafkaError WatermarkOffsets]
 -> m [Either KafkaError WatermarkOffsets])
-> IO [Either KafkaError WatermarkOffsets]
-> m [Either KafkaError WatermarkOffsets]
forall a b. (a -> b) -> a -> b
$ (PartitionId -> IO (Either KafkaError WatermarkOffsets))
-> [PartitionId] -> IO [Either KafkaError WatermarkOffsets]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (k
-> Timeout
-> TopicName
-> PartitionId
-> IO (Either KafkaError WatermarkOffsets)
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k
-> Timeout
-> TopicName
-> PartitionId
-> m (Either KafkaError WatermarkOffsets)
partitionWatermarkOffsets k
k Timeout
timeout (TopicMetadata -> TopicName
tmTopicName TopicMetadata
tm)) [PartitionId]
pids

-- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a specific partition
partitionWatermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> PartitionId -> m (Either KafkaError WatermarkOffsets)
partitionWatermarkOffsets :: k
-> Timeout
-> TopicName
-> PartitionId
-> m (Either KafkaError WatermarkOffsets)
partitionWatermarkOffsets k :: k
k (Timeout timeout :: Int
timeout) (TopicName t :: Text
t) (PartitionId p :: Int
p) = IO (Either KafkaError WatermarkOffsets)
-> m (Either KafkaError WatermarkOffsets)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError WatermarkOffsets)
 -> m (Either KafkaError WatermarkOffsets))
-> IO (Either KafkaError WatermarkOffsets)
-> m (Either KafkaError WatermarkOffsets)
forall a b. (a -> b) -> a -> b
$ do
  Either RdKafkaRespErrT (Int64, Int64)
offs <- RdKafkaTPtr
-> String
-> Int
-> Int
-> IO (Either RdKafkaRespErrT (Int64, Int64))
rdKafkaQueryWatermarkOffsets (k -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k
k) (Text -> String
Text.unpack Text
t) Int
p Int
timeout
  Either KafkaError WatermarkOffsets
-> IO (Either KafkaError WatermarkOffsets)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError WatermarkOffsets
 -> IO (Either KafkaError WatermarkOffsets))
-> Either KafkaError WatermarkOffsets
-> IO (Either KafkaError WatermarkOffsets)
forall a b. (a -> b) -> a -> b
$ (RdKafkaRespErrT -> KafkaError)
-> ((Int64, Int64) -> WatermarkOffsets)
-> Either RdKafkaRespErrT (Int64, Int64)
-> Either KafkaError WatermarkOffsets
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap RdKafkaRespErrT -> KafkaError
KafkaResponseError (Int64, Int64) -> WatermarkOffsets
toWatermark Either RdKafkaRespErrT (Int64, Int64)
offs
  where
    toWatermark :: (Int64, Int64) -> WatermarkOffsets
toWatermark (l :: Int64
l, h :: Int64
h) = TopicName -> PartitionId -> Offset -> Offset -> WatermarkOffsets
WatermarkOffsets (Text -> TopicName
TopicName Text
t) (Int -> PartitionId
PartitionId Int
p) (Int64 -> Offset
Offset Int64
l) (Int64 -> Offset
Offset Int64
h)

-- | Look up the offsets for the given topic by timestamp.
--
-- The returned offset for each partition is the earliest offset whose
-- timestamp is greater than or equal to the given timestamp in the
-- corresponding partition.
topicOffsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicName -> m (Either KafkaError [TopicPartition])
topicOffsetsForTime :: k
-> Timeout
-> Millis
-> TopicName
-> m (Either KafkaError [TopicPartition])
topicOffsetsForTime k :: k
k timeout :: Timeout
timeout timestamp :: Millis
timestamp topic :: TopicName
topic  = do
  Either KafkaError KafkaMetadata
meta <- k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
topicMetadata k
k Timeout
timeout TopicName
topic
  case Either KafkaError KafkaMetadata
meta of
    Left err :: KafkaError
err -> Either KafkaError [TopicPartition]
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [TopicPartition]
 -> m (Either KafkaError [TopicPartition]))
-> Either KafkaError [TopicPartition]
-> m (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError [TopicPartition]
forall a b. a -> Either a b
Left KafkaError
err
    Right meta' :: KafkaMetadata
meta' ->
      let tps :: [(TopicName, PartitionId)]
tps = [(TopicMetadata -> TopicName
tmTopicName TopicMetadata
t, PartitionMetadata -> PartitionId
pmPartitionId PartitionMetadata
p)| TopicMetadata
t <- KafkaMetadata -> [TopicMetadata]
kmTopics KafkaMetadata
meta', PartitionMetadata
p <- TopicMetadata -> [PartitionMetadata]
tmPartitions TopicMetadata
t]
      in k
-> Timeout
-> Millis
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k
-> Timeout
-> Millis
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
offsetsForTime k
k Timeout
timeout Millis
timestamp [(TopicName, PartitionId)]
tps

-- | Look up the offsets for the given metadata by timestamp.
--
-- The returned offset for each partition is the earliest offset whose
-- timestamp is greater than or equal to the given timestamp in the
-- corresponding partition.
offsetsForTime' :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicMetadata -> m (Either KafkaError [TopicPartition])
offsetsForTime' :: k
-> Timeout
-> Millis
-> TopicMetadata
-> m (Either KafkaError [TopicPartition])
offsetsForTime' k :: k
k timeout :: Timeout
timeout timestamp :: Millis
timestamp t :: TopicMetadata
t =
    let tps :: [(TopicName, PartitionId)]
tps = [(TopicMetadata -> TopicName
tmTopicName TopicMetadata
t, PartitionMetadata -> PartitionId
pmPartitionId PartitionMetadata
p) | PartitionMetadata
p <- TopicMetadata -> [PartitionMetadata]
tmPartitions TopicMetadata
t]
    in k
-> Timeout
-> Millis
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k
-> Timeout
-> Millis
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
offsetsForTime k
k Timeout
timeout Millis
timestamp [(TopicName, PartitionId)]
tps

-- | Look up the offsets for the given partitions by timestamp.
--
-- The returned offset for each partition is the earliest offset whose
-- timestamp is greater than or equal to the given timestamp in the
-- corresponding partition.
offsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
offsetsForTime :: k
-> Timeout
-> Millis
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
offsetsForTime k :: k
k (Timeout timeout :: Int
timeout) (Millis t :: Int64
t) tps :: [(TopicName, PartitionId)]
tps = IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [TopicPartition])
 -> m (Either KafkaError [TopicPartition]))
-> IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ do
  RdKafkaTopicPartitionListTPtr
ntps <- [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList ([TopicPartition] -> IO RdKafkaTopicPartitionListTPtr)
-> [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
forall a b. (a -> b) -> a -> b
$ (TopicName, PartitionId) -> TopicPartition
mkTopicPartition ((TopicName, PartitionId) -> TopicPartition)
-> [(TopicName, PartitionId)] -> [TopicPartition]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(TopicName, PartitionId)]
uniqueTps
  RdKafkaRespErrT
res <- RdKafkaTPtr
-> RdKafkaTopicPartitionListTPtr -> Int -> IO RdKafkaRespErrT
rdKafkaOffsetsForTimes (k -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k
k) RdKafkaTopicPartitionListTPtr
ntps Int
timeout
  case RdKafkaRespErrT
res of
    RdKafkaRespErrNoError -> [TopicPartition] -> Either KafkaError [TopicPartition]
forall a b. b -> Either a b
Right ([TopicPartition] -> Either KafkaError [TopicPartition])
-> IO [TopicPartition] -> IO (Either KafkaError [TopicPartition])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' RdKafkaTopicPartitionListTPtr
ntps
    err :: RdKafkaRespErrT
err                   -> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [TopicPartition]
 -> IO (Either KafkaError [TopicPartition]))
-> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError [TopicPartition]
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)
  where
    uniqueTps :: [(TopicName, PartitionId)]
uniqueTps = Set (TopicName, PartitionId) -> [(TopicName, PartitionId)]
forall a. Set a -> [a]
S.toList (Set (TopicName, PartitionId) -> [(TopicName, PartitionId)])
-> ([(TopicName, PartitionId)] -> Set (TopicName, PartitionId))
-> [(TopicName, PartitionId)]
-> [(TopicName, PartitionId)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(TopicName, PartitionId)] -> Set (TopicName, PartitionId)
forall a. Ord a => [a] -> Set a
S.fromList ([(TopicName, PartitionId)] -> [(TopicName, PartitionId)])
-> [(TopicName, PartitionId)] -> [(TopicName, PartitionId)]
forall a b. (a -> b) -> a -> b
$ [(TopicName, PartitionId)]
tps
    -- rd_kafka_offsets_for_times reuses `offset` to specify timestamp :(
    mkTopicPartition :: (TopicName, PartitionId) -> TopicPartition
mkTopicPartition (tn :: TopicName
tn, p :: PartitionId
p) = TopicName -> PartitionId -> PartitionOffset -> TopicPartition
TopicPartition TopicName
tn PartitionId
p (Int64 -> PartitionOffset
PartitionOffset Int64
t)

-- | List and describe all consumer groups in cluster.
allConsumerGroupsInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError [GroupInfo])
allConsumerGroupsInfo :: k -> Timeout -> m (Either KafkaError [GroupInfo])
allConsumerGroupsInfo k :: k
k (Timeout t :: Int
t) = IO (Either KafkaError [GroupInfo])
-> m (Either KafkaError [GroupInfo])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [GroupInfo])
 -> m (Either KafkaError [GroupInfo]))
-> IO (Either KafkaError [GroupInfo])
-> m (Either KafkaError [GroupInfo])
forall a b. (a -> b) -> a -> b
$ do
  Either RdKafkaRespErrT RdKafkaGroupListTPtr
res <- RdKafkaTPtr
-> Maybe String
-> Int
-> IO (Either RdKafkaRespErrT RdKafkaGroupListTPtr)
rdKafkaListGroups (k -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k
k) Maybe String
forall a. Maybe a
Nothing Int
t
  (RdKafkaGroupListTPtr -> IO [GroupInfo])
-> Either KafkaError RdKafkaGroupListTPtr
-> IO (Either KafkaError [GroupInfo])
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaGroupListTPtr -> IO [GroupInfo]
fromGroupInfoListPtr ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaGroupListTPtr
-> Either KafkaError RdKafkaGroupListTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaGroupListTPtr
res)

-- | Describe a given consumer group.
consumerGroupInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> ConsumerGroupId -> m (Either KafkaError [GroupInfo])
consumerGroupInfo :: k
-> Timeout -> ConsumerGroupId -> m (Either KafkaError [GroupInfo])
consumerGroupInfo k :: k
k (Timeout timeout :: Int
timeout) (ConsumerGroupId gn :: Text
gn) = IO (Either KafkaError [GroupInfo])
-> m (Either KafkaError [GroupInfo])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [GroupInfo])
 -> m (Either KafkaError [GroupInfo]))
-> IO (Either KafkaError [GroupInfo])
-> m (Either KafkaError [GroupInfo])
forall a b. (a -> b) -> a -> b
$ do
  Either RdKafkaRespErrT RdKafkaGroupListTPtr
res <- RdKafkaTPtr
-> Maybe String
-> Int
-> IO (Either RdKafkaRespErrT RdKafkaGroupListTPtr)
rdKafkaListGroups (k -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k
k) (String -> Maybe String
forall a. a -> Maybe a
Just (Text -> String
Text.unpack Text
gn)) Int
timeout
  (RdKafkaGroupListTPtr -> IO [GroupInfo])
-> Either KafkaError RdKafkaGroupListTPtr
-> IO (Either KafkaError [GroupInfo])
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaGroupListTPtr -> IO [GroupInfo]
fromGroupInfoListPtr ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaGroupListTPtr
-> Either KafkaError RdKafkaGroupListTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaGroupListTPtr
res)

-------------------------------------------------------------------------------
getKafkaPtr :: HasKafka k => k -> RdKafkaTPtr
getKafkaPtr :: k -> RdKafkaTPtr
getKafkaPtr k :: k
k = let (Kafka k' :: RdKafkaTPtr
k') = k -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka k
k in RdKafkaTPtr
k'
{-# INLINE getKafkaPtr #-}


fromGroupInfoListPtr :: RdKafkaGroupListTPtr -> IO [GroupInfo]
fromGroupInfoListPtr :: RdKafkaGroupListTPtr -> IO [GroupInfo]
fromGroupInfoListPtr ptr :: RdKafkaGroupListTPtr
ptr =
  RdKafkaGroupListTPtr
-> (Ptr RdKafkaGroupListT -> IO [GroupInfo]) -> IO [GroupInfo]
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaGroupListTPtr
ptr ((Ptr RdKafkaGroupListT -> IO [GroupInfo]) -> IO [GroupInfo])
-> (Ptr RdKafkaGroupListT -> IO [GroupInfo]) -> IO [GroupInfo]
forall a b. (a -> b) -> a -> b
$ \realPtr :: Ptr RdKafkaGroupListT
realPtr -> do
    RdKafkaGroupListT
gl   <- Ptr RdKafkaGroupListT -> IO RdKafkaGroupListT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaGroupListT
realPtr
    [RdKafkaGroupInfoT]
pgis <- Int -> Ptr RdKafkaGroupInfoT -> IO [RdKafkaGroupInfoT]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaGroupListT -> Int
groupCnt'RdKafkaGroupListT RdKafkaGroupListT
gl) (RdKafkaGroupListT -> Ptr RdKafkaGroupInfoT
groups'RdKafkaGroupListT RdKafkaGroupListT
gl)
    (RdKafkaGroupInfoT -> IO GroupInfo)
-> [RdKafkaGroupInfoT] -> IO [GroupInfo]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaGroupInfoT -> IO GroupInfo
fromGroupInfoPtr [RdKafkaGroupInfoT]
pgis

fromGroupInfoPtr :: RdKafkaGroupInfoT -> IO GroupInfo
fromGroupInfoPtr :: RdKafkaGroupInfoT -> IO GroupInfo
fromGroupInfoPtr gi :: RdKafkaGroupInfoT
gi = do
  --bmd <- peek (broker'RdKafkaGroupInfoT gi) -- >>= fromBrokerMetadataPtr
  --xxx <- fromBrokerMetadataPtr bmd
  Text
cid <- CString -> IO Text
peekCAText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaGroupInfoT -> CString
group'RdKafkaGroupInfoT RdKafkaGroupInfoT
gi
  Text
stt <- CString -> IO Text
peekCAText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaGroupInfoT -> CString
state'RdKafkaGroupInfoT RdKafkaGroupInfoT
gi
  Text
prt <- CString -> IO Text
peekCAText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaGroupInfoT -> CString
protocolType'RdKafkaGroupInfoT RdKafkaGroupInfoT
gi
  Text
pr  <- CString -> IO Text
peekCAText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaGroupInfoT -> CString
protocol'RdKafkaGroupInfoT RdKafkaGroupInfoT
gi
  [RdKafkaGroupMemberInfoT]
mbs <- Int -> Ptr RdKafkaGroupMemberInfoT -> IO [RdKafkaGroupMemberInfoT]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaGroupInfoT -> Int
memberCnt'RdKafkaGroupInfoT RdKafkaGroupInfoT
gi) (RdKafkaGroupInfoT -> Ptr RdKafkaGroupMemberInfoT
members'RdKafkaGroupInfoT RdKafkaGroupInfoT
gi)
  [GroupMemberInfo]
mbl <- (RdKafkaGroupMemberInfoT -> IO GroupMemberInfo)
-> [RdKafkaGroupMemberInfoT] -> IO [GroupMemberInfo]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM RdKafkaGroupMemberInfoT -> IO GroupMemberInfo
fromGroupMemberInfoPtr [RdKafkaGroupMemberInfoT]
mbs
  GroupInfo -> IO GroupInfo
forall (m :: * -> *) a. Monad m => a -> m a
return $WGroupInfo :: ConsumerGroupId
-> Maybe KafkaError
-> GroupState
-> GroupProtocolType
-> GroupProtocol
-> [GroupMemberInfo]
-> GroupInfo
GroupInfo
    { --giBroker = bmd
     giGroup :: ConsumerGroupId
giGroup          = Text -> ConsumerGroupId
ConsumerGroupId Text
cid
    , giError :: Maybe KafkaError
giError         = KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError) -> KafkaError -> Maybe KafkaError
forall a b. (a -> b) -> a -> b
$ RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaGroupInfoT -> RdKafkaRespErrT
err'RdKafkaGroupInfoT RdKafkaGroupInfoT
gi)
    , giState :: GroupState
giState         = Text -> GroupState
groupStateFromKafkaString Text
stt
    , giProtocolType :: GroupProtocolType
giProtocolType  = Text -> GroupProtocolType
GroupProtocolType Text
prt
    , giProtocol :: GroupProtocol
giProtocol      = Text -> GroupProtocol
GroupProtocol Text
pr
    , giMembers :: [GroupMemberInfo]
giMembers       = [GroupMemberInfo]
mbl
    }

fromGroupMemberInfoPtr :: RdKafkaGroupMemberInfoT -> IO GroupMemberInfo
fromGroupMemberInfoPtr :: RdKafkaGroupMemberInfoT -> IO GroupMemberInfo
fromGroupMemberInfoPtr mi :: RdKafkaGroupMemberInfoT
mi = do
  Text
mid <- CString -> IO Text
peekCAText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaGroupMemberInfoT -> CString
memberId'RdKafkaGroupMemberInfoT RdKafkaGroupMemberInfoT
mi
  Text
cid <- CString -> IO Text
peekCAText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaGroupMemberInfoT -> CString
clientId'RdKafkaGroupMemberInfoT RdKafkaGroupMemberInfoT
mi
  Text
hst <- CString -> IO Text
peekCAText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaGroupMemberInfoT -> CString
clientHost'RdKafkaGroupMemberInfoT RdKafkaGroupMemberInfoT
mi
  [Word8]
mtd <- Int -> Ptr Word8 -> IO [Word8]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaGroupMemberInfoT -> Int
memberMetadataSize'RdKafkaGroupMemberInfoT RdKafkaGroupMemberInfoT
mi) (RdKafkaGroupMemberInfoT -> Ptr Word8
memberMetadata'RdKafkaGroupMemberInfoT RdKafkaGroupMemberInfoT
mi)
  [Word8]
ass <- Int -> Ptr Word8 -> IO [Word8]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaGroupMemberInfoT -> Int
memberAssignmentSize'RdKafkaGroupMemberInfoT RdKafkaGroupMemberInfoT
mi) (RdKafkaGroupMemberInfoT -> Ptr Word8
memberAssignment'RdKafkaGroupMemberInfoT RdKafkaGroupMemberInfoT
mi)
  GroupMemberInfo -> IO GroupMemberInfo
forall (m :: * -> *) a. Monad m => a -> m a
return $WGroupMemberInfo :: GroupMemberId
-> ClientId -> Text -> ByteString -> ByteString -> GroupMemberInfo
GroupMemberInfo
    { gmiMemberId :: GroupMemberId
gmiMemberId   = Text -> GroupMemberId
GroupMemberId Text
mid
    , gmiClientId :: ClientId
gmiClientId   = Text -> ClientId
ClientId Text
cid
    , gmiClientHost :: Text
gmiClientHost = Text
hst
    , gmiMetadata :: ByteString
gmiMetadata   = [Word8] -> ByteString
pack [Word8]
mtd
    , gmiAssignment :: ByteString
gmiAssignment = [Word8] -> ByteString
pack [Word8]
ass
    }

fromTopicMetadataPtr :: RdKafkaMetadataTopicT -> IO TopicMetadata
fromTopicMetadataPtr :: RdKafkaMetadataTopicT -> IO TopicMetadata
fromTopicMetadataPtr tm :: RdKafkaMetadataTopicT
tm = do
  Text
tnm <- CString -> IO Text
peekCAText (RdKafkaMetadataTopicT -> CString
topic'RdKafkaMetadataTopicT RdKafkaMetadataTopicT
tm)
  [RdKafkaMetadataPartitionT]
pts <- Int
-> Ptr RdKafkaMetadataPartitionT -> IO [RdKafkaMetadataPartitionT]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaMetadataTopicT -> Int
partitionCnt'RdKafkaMetadataTopicT RdKafkaMetadataTopicT
tm) (RdKafkaMetadataTopicT -> Ptr RdKafkaMetadataPartitionT
partitions'RdKafkaMetadataTopicT RdKafkaMetadataTopicT
tm)
  [PartitionMetadata]
pms <- (RdKafkaMetadataPartitionT -> IO PartitionMetadata)
-> [RdKafkaMetadataPartitionT] -> IO [PartitionMetadata]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM RdKafkaMetadataPartitionT -> IO PartitionMetadata
fromPartitionMetadataPtr [RdKafkaMetadataPartitionT]
pts
  TopicMetadata -> IO TopicMetadata
forall (m :: * -> *) a. Monad m => a -> m a
return $WTopicMetadata :: TopicName
-> [PartitionMetadata] -> Maybe KafkaError -> TopicMetadata
TopicMetadata
    { tmTopicName :: TopicName
tmTopicName   = Text -> TopicName
TopicName Text
tnm
    , tmPartitions :: [PartitionMetadata]
tmPartitions  = [PartitionMetadata]
pms
    , tmError :: Maybe KafkaError
tmError       = KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError) -> KafkaError -> Maybe KafkaError
forall a b. (a -> b) -> a -> b
$ RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaMetadataTopicT -> RdKafkaRespErrT
err'RdKafkaMetadataTopicT RdKafkaMetadataTopicT
tm)
    }

fromPartitionMetadataPtr :: RdKafkaMetadataPartitionT -> IO PartitionMetadata
fromPartitionMetadataPtr :: RdKafkaMetadataPartitionT -> IO PartitionMetadata
fromPartitionMetadataPtr pm :: RdKafkaMetadataPartitionT
pm = do
  [CInt32T]
reps <- Int -> Ptr CInt32T -> IO [CInt32T]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaMetadataPartitionT -> Int
replicaCnt'RdKafkaMetadataPartitionT RdKafkaMetadataPartitionT
pm) (RdKafkaMetadataPartitionT -> Ptr CInt32T
replicas'RdKafkaMetadataPartitionT RdKafkaMetadataPartitionT
pm)
  [CInt32T]
isrs <- Int -> Ptr CInt32T -> IO [CInt32T]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaMetadataPartitionT -> Int
isrCnt'RdKafkaMetadataPartitionT RdKafkaMetadataPartitionT
pm) (RdKafkaMetadataPartitionT -> Ptr CInt32T
isrs'RdKafkaMetadataPartitionT RdKafkaMetadataPartitionT
pm)
  PartitionMetadata -> IO PartitionMetadata
forall (m :: * -> *) a. Monad m => a -> m a
return $WPartitionMetadata :: PartitionId
-> Maybe KafkaError
-> BrokerId
-> [BrokerId]
-> [BrokerId]
-> PartitionMetadata
PartitionMetadata
    { pmPartitionId :: PartitionId
pmPartitionId     = Int -> PartitionId
PartitionId (RdKafkaMetadataPartitionT -> Int
id'RdKafkaMetadataPartitionT RdKafkaMetadataPartitionT
pm)
    , pmError :: Maybe KafkaError
pmError           = KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError) -> KafkaError -> Maybe KafkaError
forall a b. (a -> b) -> a -> b
$ RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaMetadataPartitionT -> RdKafkaRespErrT
err'RdKafkaMetadataPartitionT RdKafkaMetadataPartitionT
pm)
    , pmLeader :: BrokerId
pmLeader          = Int -> BrokerId
BrokerId (RdKafkaMetadataPartitionT -> Int
leader'RdKafkaMetadataPartitionT RdKafkaMetadataPartitionT
pm)
    , pmReplicas :: [BrokerId]
pmReplicas        = Int -> BrokerId
BrokerId (Int -> BrokerId) -> (CInt32T -> Int) -> CInt32T -> BrokerId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CInt32T -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt32T -> BrokerId) -> [CInt32T] -> [BrokerId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [CInt32T]
reps
    , pmInSyncReplicas :: [BrokerId]
pmInSyncReplicas  = Int -> BrokerId
BrokerId (Int -> BrokerId) -> (CInt32T -> Int) -> CInt32T -> BrokerId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CInt32T -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt32T -> BrokerId) -> [CInt32T] -> [BrokerId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [CInt32T]
isrs
    }


fromBrokerMetadataPtr :: RdKafkaMetadataBrokerT -> IO BrokerMetadata
fromBrokerMetadataPtr :: RdKafkaMetadataBrokerT -> IO BrokerMetadata
fromBrokerMetadataPtr bm :: RdKafkaMetadataBrokerT
bm = do
    Text
host <- CString -> IO Text
peekCAText (RdKafkaMetadataBrokerT -> CString
host'RdKafkaMetadataBrokerT RdKafkaMetadataBrokerT
bm)
    BrokerMetadata -> IO BrokerMetadata
forall (m :: * -> *) a. Monad m => a -> m a
return $WBrokerMetadata :: BrokerId -> Text -> Int -> BrokerMetadata
BrokerMetadata
      { bmBrokerId :: BrokerId
bmBrokerId   = Int -> BrokerId
BrokerId (RdKafkaMetadataBrokerT -> Int
id'RdKafkaMetadataBrokerT RdKafkaMetadataBrokerT
bm)
      , bmBrokerHost :: Text
bmBrokerHost = Text
host
      , bmBrokerPort :: Int
bmBrokerPort = RdKafkaMetadataBrokerT -> Int
port'RdKafkaMetadataBrokerT RdKafkaMetadataBrokerT
bm
      }


fromKafkaMetadataPtr :: RdKafkaMetadataTPtr -> IO KafkaMetadata
fromKafkaMetadataPtr :: RdKafkaMetadataTPtr -> IO KafkaMetadata
fromKafkaMetadataPtr ptr :: RdKafkaMetadataTPtr
ptr =
  RdKafkaMetadataTPtr
-> (Ptr RdKafkaMetadataT -> IO KafkaMetadata) -> IO KafkaMetadata
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaMetadataTPtr
ptr ((Ptr RdKafkaMetadataT -> IO KafkaMetadata) -> IO KafkaMetadata)
-> (Ptr RdKafkaMetadataT -> IO KafkaMetadata) -> IO KafkaMetadata
forall a b. (a -> b) -> a -> b
$ \realPtr :: Ptr RdKafkaMetadataT
realPtr -> do
    RdKafkaMetadataT
km    <- Ptr RdKafkaMetadataT -> IO RdKafkaMetadataT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMetadataT
realPtr
    [RdKafkaMetadataBrokerT]
pbms  <- Int -> Ptr RdKafkaMetadataBrokerT -> IO [RdKafkaMetadataBrokerT]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaMetadataT -> Int
brokerCnt'RdKafkaMetadataT RdKafkaMetadataT
km) (RdKafkaMetadataT -> Ptr RdKafkaMetadataBrokerT
brokers'RdKafkaMetadataT RdKafkaMetadataT
km)
    [BrokerMetadata]
bms   <- (RdKafkaMetadataBrokerT -> IO BrokerMetadata)
-> [RdKafkaMetadataBrokerT] -> IO [BrokerMetadata]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM RdKafkaMetadataBrokerT -> IO BrokerMetadata
fromBrokerMetadataPtr [RdKafkaMetadataBrokerT]
pbms
    [RdKafkaMetadataTopicT]
ptms  <- Int -> Ptr RdKafkaMetadataTopicT -> IO [RdKafkaMetadataTopicT]
forall a. Storable a => Int -> Ptr a -> IO [a]
peekArray (RdKafkaMetadataT -> Int
topicCnt'RdKafkaMetadataT RdKafkaMetadataT
km) (RdKafkaMetadataT -> Ptr RdKafkaMetadataTopicT
topics'RdKafkaMetadataT RdKafkaMetadataT
km)
    [TopicMetadata]
tms   <- (RdKafkaMetadataTopicT -> IO TopicMetadata)
-> [RdKafkaMetadataTopicT] -> IO [TopicMetadata]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM RdKafkaMetadataTopicT -> IO TopicMetadata
fromTopicMetadataPtr [RdKafkaMetadataTopicT]
ptms
    KafkaMetadata -> IO KafkaMetadata
forall (m :: * -> *) a. Monad m => a -> m a
return $WKafkaMetadata :: [BrokerMetadata] -> [TopicMetadata] -> BrokerId -> KafkaMetadata
KafkaMetadata
      { kmBrokers :: [BrokerMetadata]
kmBrokers = [BrokerMetadata]
bms
      , kmTopics :: [TopicMetadata]
kmTopics  = [TopicMetadata]
tms
      , kmOrigBroker :: BrokerId
kmOrigBroker = Int -> BrokerId
BrokerId (Int -> BrokerId) -> Int -> BrokerId
forall a b. (a -> b) -> a -> b
$ CInt32T -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (RdKafkaMetadataT -> CInt32T
origBrokerId'RdKafkaMetadataT RdKafkaMetadataT
km)
      }

groupStateFromKafkaString :: Text -> GroupState
groupStateFromKafkaString :: Text -> GroupState
groupStateFromKafkaString s :: Text
s = case Text
s of
  "PreparingRebalance" -> GroupState
GroupPreparingRebalance
  "AwaitingSync"       -> GroupState
GroupAwaitingSync
  "Stable"             -> GroupState
GroupStable
  "Dead"               -> GroupState
GroupDead
  "Empty"              -> GroupState
GroupEmpty
  _                    -> String -> GroupState
forall a. HasCallStack => String -> a
error (String -> GroupState) -> String -> GroupState
forall a b. (a -> b) -> a -> b
$ "Unknown group state: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack Text
s