module Kafka.Consumer.Convert
where
import Control.Monad
import qualified Data.ByteString as BS
import Foreign
import Foreign.C.Error
import Foreign.C.String
import Kafka.Consumer.Types
import Kafka.Types
import Kafka.Internal.RdKafka
import Kafka.Internal.RdKafkaEnum
import Kafka.Internal.Shared
offsetSyncToInt :: OffsetStoreSync -> Int
offsetSyncToInt sync =
case sync of
OffsetSyncDisable -> -1
OffsetSyncImmediate -> 0
OffsetSyncInterval ms -> ms
{-# INLINE offsetSyncToInt #-}
offsetToInt64 :: PartitionOffset -> Int64
offsetToInt64 o = case o of
PartitionOffsetBeginning -> -2
PartitionOffsetEnd -> -1
PartitionOffset off -> off
PartitionOffsetStored -> -1000
PartitionOffsetInvalid -> -1001
{-# INLINE offsetToInt64 #-}
int64ToOffset :: Int64 -> PartitionOffset
int64ToOffset o
| o == -2 = PartitionOffsetBeginning
| o == -1 = PartitionOffsetEnd
| o == -1000 = PartitionOffsetStored
| o >= 0 = PartitionOffset o
| otherwise = PartitionOffsetInvalid
{-# INLINE int64ToOffset #-}
fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' ppl = peek ppl >>= fromNativeTopicPartitionList
fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList pl =
let count = cnt'RdKafkaTopicPartitionListT pl
elems = elems'RdKafkaTopicPartitionListT pl
in mapM (peekElemOff elems >=> toPart) [0..(fromIntegral count - 1)]
where
toPart :: RdKafkaTopicPartitionT -> IO TopicPartition
toPart p = do
topic <- peekCString $ topic'RdKafkaTopicPartitionT p
return TopicPartition {
tpTopicName = TopicName topic,
tpPartition = PartitionId $ partition'RdKafkaTopicPartitionT p,
tpOffset = int64ToOffset $ offset'RdKafkaTopicPartitionT p
}
toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList ps = do
pl <- newRdKafkaTopicPartitionListT (length ps)
mapM_ (\p -> do
let TopicName tn = tpTopicName p
(PartitionId tp) = tpPartition p
to = offsetToInt64 $ tpOffset p
_ <- rdKafkaTopicPartitionListAdd pl tn tp
rdKafkaTopicPartitionListSetOffset pl tn tp to) ps
return pl
topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage m =
let (Offset moff) = crOffset m
in TopicPartition (crTopic m) (crPartition m) (PartitionOffset moff)
fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
fromMessagePtr ptr =
withForeignPtr ptr $ \realPtr ->
if realPtr == nullPtr then (Left . kafkaRespErr) <$> getErrno
else do
s <- peek realPtr
msg <- if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
then return . Left . KafkaResponseError $ err'RdKafkaMessageT s
else Right <$> fromMessageStorable s
rdKafkaMessageDestroy realPtr
return msg
fromMessageStorable :: RdKafkaMessageT -> IO (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))
fromMessageStorable s = do
topic <- newForeignPtr_ (topic'RdKafkaMessageT s) >>= rdKafkaTopicName
payload <- if payload'RdKafkaMessageT s == nullPtr
then return Nothing
else Just <$> word8PtrToBS (len'RdKafkaMessageT s) (payload'RdKafkaMessageT s)
key <- if key'RdKafkaMessageT s == nullPtr
then return Nothing
else Just <$> word8PtrToBS (keyLen'RdKafkaMessageT s) (key'RdKafkaMessageT s)
return $ ConsumerRecord
(TopicName topic)
(PartitionId $ partition'RdKafkaMessageT s)
(Offset $ offset'RdKafkaMessageT s)
key
payload
offsetCommitToBool :: OffsetCommit -> Bool
offsetCommitToBool OffsetCommit = False
offsetCommitToBool OffsetCommitAsync = True