module Kafka.Metadata
( KafkaMetadata(..), BrokerMetadata(..), TopicMetadata(..), PartitionMetadata(..)
, WatermarkOffsets(..)
, GroupMemberId(..), GroupMemberInfo(..)
, GroupProtocolType(..), GroupProtocol(..), GroupState(..)
, GroupInfo(..)
, allTopicsMetadata, topicMetadata
, watermarkOffsets, watermarkOffsets'
, partitionWatermarkOffsets
, offsetsForTime, offsetsForTime', topicOffsetsForTime
, allConsumerGroupsInfo, consumerGroupInfo
)
where

import           Control.Arrow          (left)
import           Control.Exception      (bracket)
import           Control.Monad.IO.Class (MonadIO, liftIO)
import           Data.Bifunctor
import           Data.ByteString        (ByteString, pack)
import           Data.Monoid            ((<>))
import qualified Data.Set               as S
import           Foreign
import           Foreign.C.String
import           Kafka.Consumer.Convert
import           Kafka.Consumer.Types
import           Kafka.Internal.RdKafka
import           Kafka.Internal.Setup
import           Kafka.Internal.Shared
import           Kafka.Types

data KafkaMetadata = KafkaMetadata
  { kmBrokers    :: [BrokerMetadata]
  , kmTopics     :: [TopicMetadata]
  , kmOrigBroker :: !BrokerId
  } deriving (Show, Eq)

data BrokerMetadata = BrokerMetadata
  { bmBrokerId   :: !BrokerId
  , bmBrokerHost :: !String
  , bmBrokerPort :: !Int
  } deriving (Show, Eq)

data PartitionMetadata = PartitionMetadata
  { pmPartitionId    :: !PartitionId
  , pmError          :: Maybe KafkaError
  , pmLeader         :: !BrokerId
  , pmReplicas       :: [BrokerId]
  , pmInSyncReplicas :: [BrokerId]
  } deriving (Show, Eq)

data TopicMetadata = TopicMetadata
  { tmTopicName  :: !TopicName
  , tmPartitions :: [PartitionMetadata]
  , tmError      :: Maybe KafkaError
  } deriving (Show, Eq)

data WatermarkOffsets = WatermarkOffsets
  { woTopicName     :: !TopicName
  , woPartitionId   :: !PartitionId
  , woLowWatermark  :: !Offset
  , woHighWatermark :: !Offset
  } deriving (Show, Eq)

newtype GroupMemberId = GroupMemberId String deriving (Show, Eq, Read, Ord)
data GroupMemberInfo = GroupMemberInfo
  { gmiMemberId   :: !GroupMemberId
  , gmiClientId   :: !ClientId
  , gmiClientHost :: !String
  , gmiMetadata   :: !ByteString
  , gmiAssignment :: !ByteString
  } deriving (Show, Eq)

newtype GroupProtocolType = GroupProtocolType String deriving (Show, Eq, Read, Ord)
newtype GroupProtocol = GroupProtocol String  deriving (Show, Eq, Read, Ord)
data GroupState
  = GroupPreparingRebalance       -- ^ Group is preparing to rebalance
  | GroupEmpty                    -- ^ Group has no more members, but lingers until all offsets have expired
  | GroupAwaitingSync             -- ^ Group is awaiting state assignment from the leader
  | GroupStable                   -- ^ Group is stable
  | GroupDead                     -- ^ Group has no more members and its metadata is being removed
  deriving (Show, Eq, Read, Ord)

data GroupInfo = GroupInfo
  { giGroup        :: !ConsumerGroupId
  , giError        :: Maybe KafkaError
  , giState        :: !GroupState
  , giProtocolType :: !GroupProtocolType
  , giProtocol     :: !GroupProtocol
  , giMembers      :: [GroupMemberInfo]
  } deriving (Show, Eq)

-- | Returns metadata for all topics in the cluster
allTopicsMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError KafkaMetadata)
allTopicsMetadata k (Timeout timeout) = liftIO $ do
  meta <- rdKafkaMetadata (getKafkaPtr k) True Nothing timeout
  traverse fromKafkaMetadataPtr (left KafkaResponseError meta)

-- | Returns metadata only for specified topic
topicMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
topicMetadata k (Timeout timeout) (TopicName tn) = liftIO $
  bracket mkTopic clTopic $ \mbt -> case mbt of
    Left err -> return (Left $ KafkaError err)
    Right t -> do
      meta <- rdKafkaMetadata (getKafkaPtr k) False (Just t) timeout
      traverse fromKafkaMetadataPtr (left KafkaResponseError meta)
  where
    mkTopic = newUnmanagedRdKafkaTopicT (getKafkaPtr k) tn Nothing
    clTopic = either (return . const ()) destroyUnmanagedRdKafkaTopic

-- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a given topic.
watermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets k timeout t = do
  meta <- topicMetadata k timeout t
  case meta of
    Left err -> return [Left err]
    Right tm -> if null (kmTopics tm)
                  then return []
                  else watermarkOffsets' k timeout (head $ kmTopics tm)

-- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a given topic.
watermarkOffsets' :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicMetadata -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets' k timeout tm =
  let pids = pmPartitionId <$> tmPartitions tm
  in liftIO $ traverse (partitionWatermarkOffsets k timeout (tmTopicName tm)) pids

-- | Query broker for low (oldest/beginning) and high (newest/end) offsets for a specific partition
partitionWatermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> PartitionId -> m (Either KafkaError WatermarkOffsets)
partitionWatermarkOffsets k (Timeout timeout) (TopicName t) (PartitionId p) = liftIO $ do
  offs <- rdKafkaQueryWatermarkOffsets (getKafkaPtr k) t p timeout
  return $ bimap KafkaResponseError toWatermark offs
  where
    toWatermark (l, h) = WatermarkOffsets (TopicName t) (PartitionId p) (Offset l) (Offset h)

-- | Look up the offsets for the given topic by timestamp.
--
-- The returned offset for each partition is the earliest offset whose
-- timestamp is greater than or equal to the given timestamp in the
-- corresponding partition.
topicOffsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicName -> m (Either KafkaError [TopicPartition])
topicOffsetsForTime k timeout timestamp topic  = do
  meta <- topicMetadata k timeout topic
  case meta of
    Left err -> return $ Left err
    Right meta' ->
      let tps = [(tmTopicName t, pmPartitionId p)| t <- kmTopics meta', p <- tmPartitions t]
      in offsetsForTime k timeout timestamp tps

-- | Look up the offsets for the given metadata by timestamp.
--
-- The returned offset for each partition is the earliest offset whose
-- timestamp is greater than or equal to the given timestamp in the
-- corresponding partition.
offsetsForTime' :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicMetadata -> m (Either KafkaError [TopicPartition])
offsetsForTime' k timeout timestamp t =
    let tps = [(tmTopicName t, pmPartitionId p) | p <- tmPartitions t]
    in offsetsForTime k timeout timestamp tps

-- | Look up the offsets for the given partitions by timestamp.
--
-- The returned offset for each partition is the earliest offset whose
-- timestamp is greater than or equal to the given timestamp in the
-- corresponding partition.
offsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
offsetsForTime k (Timeout timeout) (Millis t) tps = liftIO $ do
  ntps <- toNativeTopicPartitionList $ mkTopicPartition <$> uniqueTps
  res <- rdKafkaOffsetsForTimes (getKafkaPtr k) ntps timeout
  case res of
    RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps
    err                   -> return $ Left (KafkaResponseError err)
  where
    uniqueTps = S.toList . S.fromList $ tps
    -- rd_kafka_offsets_for_times reuses `offset` to specify timestamp :(
    mkTopicPartition (tn, p) = TopicPartition tn p (PartitionOffset t)

-- | List and describe all consumer groups in cluster.
allConsumerGroupsInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError [GroupInfo])
allConsumerGroupsInfo k (Timeout t) = liftIO $ do
  res <- rdKafkaListGroups (getKafkaPtr k) Nothing t
  traverse fromGroupInfoListPtr (left KafkaResponseError res)

-- | Describe a given consumer group.
consumerGroupInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> ConsumerGroupId -> m (Either KafkaError [GroupInfo])
consumerGroupInfo k (Timeout timeout) (ConsumerGroupId gn) = liftIO $ do
  res <- rdKafkaListGroups (getKafkaPtr k) (Just gn) timeout
  traverse fromGroupInfoListPtr (left KafkaResponseError res)

-------------------------------------------------------------------------------
getKafkaPtr :: HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k = let (Kafka k') = getKafka k in k'
{-# INLINE getKafkaPtr #-}


fromGroupInfoListPtr :: RdKafkaGroupListTPtr -> IO [GroupInfo]
fromGroupInfoListPtr ptr =
  withForeignPtr ptr $ \realPtr -> do
    gl   <- peek realPtr
    pgis <- peekArray (groupCnt'RdKafkaGroupListT gl) (groups'RdKafkaGroupListT gl)
    traverse fromGroupInfoPtr pgis

fromGroupInfoPtr :: RdKafkaGroupInfoT -> IO GroupInfo
fromGroupInfoPtr gi = do
  --bmd <- peek (broker'RdKafkaGroupInfoT gi) -- >>= fromBrokerMetadataPtr
  --xxx <- fromBrokerMetadataPtr bmd
  cid <- peekCAString $ group'RdKafkaGroupInfoT gi
  stt <- peekCAString $ state'RdKafkaGroupInfoT gi
  prt <- peekCAString $ protocolType'RdKafkaGroupInfoT gi
  pr  <- peekCAString $ protocol'RdKafkaGroupInfoT gi
  mbs <- peekArray (memberCnt'RdKafkaGroupInfoT gi) (members'RdKafkaGroupInfoT gi)
  mbl <- mapM fromGroupMemberInfoPtr mbs
  return GroupInfo
    { --giBroker = bmd
     giGroup          = ConsumerGroupId cid
    , giError         = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaGroupInfoT gi)
    , giState         = groupStateFromKafkaString stt
    , giProtocolType  = GroupProtocolType prt
    , giProtocol      = GroupProtocol pr
    , giMembers       = mbl
    }

fromGroupMemberInfoPtr :: RdKafkaGroupMemberInfoT -> IO GroupMemberInfo
fromGroupMemberInfoPtr mi = do
  mid <- peekCAString $ memberId'RdKafkaGroupMemberInfoT mi
  cid <- peekCAString $ clientId'RdKafkaGroupMemberInfoT mi
  hst <- peekCAString $ clientHost'RdKafkaGroupMemberInfoT mi
  mtd <- peekArray (memberMetadataSize'RdKafkaGroupMemberInfoT mi) (memberMetadata'RdKafkaGroupMemberInfoT mi)
  ass <- peekArray (memberAssignmentSize'RdKafkaGroupMemberInfoT mi) (memberAssignment'RdKafkaGroupMemberInfoT mi)
  return GroupMemberInfo
    { gmiMemberId   = GroupMemberId mid
    , gmiClientId   = ClientId cid
    , gmiClientHost = hst
    , gmiMetadata   = pack mtd
    , gmiAssignment = pack ass
    }

fromTopicMetadataPtr :: RdKafkaMetadataTopicT -> IO TopicMetadata
fromTopicMetadataPtr tm = do
  tnm <- peekCAString (topic'RdKafkaMetadataTopicT tm)
  pts <- peekArray (partitionCnt'RdKafkaMetadataTopicT tm) (partitions'RdKafkaMetadataTopicT tm)
  pms <- mapM fromPartitionMetadataPtr pts
  return TopicMetadata
    { tmTopicName   = TopicName tnm
    , tmPartitions  = pms
    , tmError       = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataTopicT tm)
    }

fromPartitionMetadataPtr :: RdKafkaMetadataPartitionT -> IO PartitionMetadata
fromPartitionMetadataPtr pm = do
  reps <- peekArray (replicaCnt'RdKafkaMetadataPartitionT pm) (replicas'RdKafkaMetadataPartitionT pm)
  isrs <- peekArray (isrCnt'RdKafkaMetadataPartitionT pm) (isrs'RdKafkaMetadataPartitionT pm)
  return PartitionMetadata
    { pmPartitionId     = PartitionId (id'RdKafkaMetadataPartitionT pm)
    , pmError           = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataPartitionT pm)
    , pmLeader          = BrokerId (leader'RdKafkaMetadataPartitionT pm)
    , pmReplicas        = (BrokerId . fromIntegral) <$> reps
    , pmInSyncReplicas  = (BrokerId . fromIntegral) <$> isrs
    }


fromBrokerMetadataPtr :: RdKafkaMetadataBrokerT -> IO BrokerMetadata
fromBrokerMetadataPtr bm = do
    host <- peekCAString (host'RdKafkaMetadataBrokerT bm)
    return BrokerMetadata
      { bmBrokerId   = BrokerId (id'RdKafkaMetadataBrokerT bm)
      , bmBrokerHost = host
      , bmBrokerPort = port'RdKafkaMetadataBrokerT bm
      }


fromKafkaMetadataPtr :: RdKafkaMetadataTPtr -> IO KafkaMetadata
fromKafkaMetadataPtr ptr =
  withForeignPtr ptr $ \realPtr -> do
    km    <- peek realPtr
    pbms  <- peekArray (brokerCnt'RdKafkaMetadataT km) (brokers'RdKafkaMetadataT km)
    bms   <- mapM fromBrokerMetadataPtr pbms
    ptms  <- peekArray (topicCnt'RdKafkaMetadataT km) (topics'RdKafkaMetadataT km)
    tms   <- mapM fromTopicMetadataPtr ptms
    return KafkaMetadata
      { kmBrokers = bms
      , kmTopics  = tms
      , kmOrigBroker = BrokerId $ fromIntegral (origBrokerId'RdKafkaMetadataT km)
      }

groupStateFromKafkaString :: String -> GroupState
groupStateFromKafkaString s = case s of
  "PreparingRebalance" -> GroupPreparingRebalance
  "AwaitingSync"       -> GroupAwaitingSync
  "Stable"             -> GroupStable
  "Dead"               -> GroupDead
  "Empty"              -> GroupEmpty
  _                    -> error $ "Unknown group state: " <> s