module Kafka.Consumer.Convert

where

import           Control.Monad
import qualified Data.ByteString        as BS
import           Data.Map.Strict        (Map, fromListWith)
import qualified Data.Set               as S
import           Foreign
import           Foreign.C.Error
import           Foreign.C.String
import           Kafka.Consumer.Types
import           Kafka.Internal.RdKafka
import           Kafka.Internal.Shared
import           Kafka.Types

-- | Converts offsets sync policy to integer (the way Kafka understands it):
--
--     * @OffsetSyncDisable == -1@
--
--     * @OffsetSyncImmediate == 0@
--
--     * @OffsetSyncInterval ms == ms@
offsetSyncToInt :: OffsetStoreSync -> Int
offsetSyncToInt sync =
    case sync of
        OffsetSyncDisable     -> -1
        OffsetSyncImmediate   -> 0
        OffsetSyncInterval ms -> ms
{-# INLINE offsetSyncToInt #-}

offsetToInt64 :: PartitionOffset -> Int64
offsetToInt64 o = case o of
    PartitionOffsetBeginning -> -2
    PartitionOffsetEnd       -> -1
    PartitionOffset off      -> off
    PartitionOffsetStored    -> -1000
    PartitionOffsetInvalid   -> -1001
{-# INLINE offsetToInt64 #-}

int64ToOffset :: Int64 -> PartitionOffset
int64ToOffset o
    | o == -2    = PartitionOffsetBeginning
    | o == -1    = PartitionOffsetEnd
    | o == -1000 = PartitionOffsetStored
    | o >= 0     = PartitionOffset o
    | otherwise  = PartitionOffsetInvalid
{-# INLINE int64ToOffset #-}

fromNativeTopicPartitionList'' :: RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ptr =
    withForeignPtr ptr $ \fptr -> fromNativeTopicPartitionList' fptr

fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' ppl =
    if ppl == nullPtr
        then return []
        else peek ppl >>= fromNativeTopicPartitionList

fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList pl =
    let count = cnt'RdKafkaTopicPartitionListT pl
        elems = elems'RdKafkaTopicPartitionListT pl
    in mapM (peekElemOff elems >=> toPart) [0..(fromIntegral count - 1)]
    where
        toPart :: RdKafkaTopicPartitionT -> IO TopicPartition
        toPart p = do
            topic <- peekCString $ topic'RdKafkaTopicPartitionT p
            return TopicPartition {
                tpTopicName = TopicName topic,
                tpPartition = PartitionId $ partition'RdKafkaTopicPartitionT p,
                tpOffset    = int64ToOffset $ offset'RdKafkaTopicPartitionT p
            }

toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList ps = do
    pl <- newRdKafkaTopicPartitionListT (length ps)
    mapM_ (\p -> do
        let TopicName tn = tpTopicName p
            (PartitionId tp) = tpPartition p
            to = offsetToInt64 $ tpOffset p
        _ <- rdKafkaTopicPartitionListAdd pl tn tp
        rdKafkaTopicPartitionListSetOffset pl tn tp to) ps
    return pl

toNativeTopicPartitionList' :: [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' tps = do
    let utps = S.toList . S.fromList $ tps
    pl <- newRdKafkaTopicPartitionListT (length utps)
    mapM_ (\(TopicName t, PartitionId p) -> rdKafkaTopicPartitionListAdd pl t p) utps
    return pl

topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage m =
  let (Offset moff) = crOffset m
   in TopicPartition (crTopic m) (crPartition m) (PartitionOffset moff)

-- | Creates a topic partition message for use with the offset commit message.
-- We increment the offset by 1 here because when we commit, the offset is the position
-- the consumer reads from to process the next message.
topicPartitionFromMessageForCommit :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit m =
  let (TopicPartition t p (PartitionOffset moff)) = topicPartitionFromMessage m
   in TopicPartition t p (PartitionOffset $ moff + 1)

toMap :: Ord k => [(k, v)] -> Map k [v]
toMap kvs = fromListWith (++) [(k, [v]) | (k, v) <- kvs]

fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
fromMessagePtr ptr =
    withForeignPtr ptr $ \realPtr ->
    if realPtr == nullPtr then (Left . kafkaRespErr) <$> getErrno
    else do
        s <- peek realPtr
        msg <- if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
                then return . Left . KafkaResponseError $ err'RdKafkaMessageT s
                else Right <$> mkRecord s
        rdKafkaMessageDestroy realPtr
        return msg
    where
        mkRecord msg = do
            topic     <- readTopic msg
            key       <- readKey msg
            payload   <- readPayload msg
            timestamp <- readTimestamp ptr
            return ConsumerRecord
                { crTopic     = TopicName topic
                , crPartition = PartitionId $ partition'RdKafkaMessageT msg
                , crOffset    = Offset $ offset'RdKafkaMessageT msg
                , crTimestamp = timestamp
                , crKey       = key
                , crValue     = payload
                }

        readTopic msg = newForeignPtr_ (topic'RdKafkaMessageT msg) >>= rdKafkaTopicName
        readPayload = readBS len'RdKafkaMessageT payload'RdKafkaMessageT
        readKey = readBS keyLen'RdKafkaMessageT key'RdKafkaMessageT
        readTimestamp msg =
            alloca $ \p -> do
                typeP <- newForeignPtr_ p
                ts <- fromIntegral <$> rdKafkaMessageTimestamp msg typeP
                tsType <- peek p
                return $ case tsType of
                    RdKafkaTimestampCreateTime    -> CreateTime (Millis ts)
                    RdKafkaTimestampLogAppendTime -> LogAppendTime (Millis ts)
                    RdKafkaTimestampNotAvailable  -> NoTimestamp

        readBS flen fdata s = if fdata s == nullPtr
                        then return Nothing
                        else Just <$> word8PtrToBS (flen s) (fdata s)


offsetCommitToBool :: OffsetCommit -> Bool
offsetCommitToBool OffsetCommit      = False
offsetCommitToBool OffsetCommitAsync = True