module Kafka.Consumer.Convert ( offsetSyncToInt , offsetToInt64 , int64ToOffset , fromNativeTopicPartitionList'' , fromNativeTopicPartitionList' , fromNativeTopicPartitionList , toNativeTopicPartitionList , toNativeTopicPartitionListNoDispose , toNativeTopicPartitionList' , topicPartitionFromMessage , topicPartitionFromMessageForCommit , toMap , fromMessagePtr , offsetCommitToBool ) where import Control.Monad ((>=>)) import qualified Data.ByteString as BS import Data.Int (Int64) import Data.Map.Strict (Map, fromListWith) import qualified Data.Set as S import qualified Data.Text as Text import Foreign.Ptr (Ptr, nullPtr) import Foreign.ForeignPtr (withForeignPtr) import Foreign.Storable (Storable(..)) import Foreign.C.Error (getErrno) import Kafka.Consumer.Types (ConsumerRecord(..), TopicPartition(..), Offset(..), OffsetCommit(..), PartitionOffset(..), OffsetStoreSync(..)) import Kafka.Internal.RdKafka ( RdKafkaRespErrT(..) , RdKafkaMessageT(..) , RdKafkaTopicPartitionListTPtr , RdKafkaTopicPartitionListT(..) , RdKafkaMessageTPtr , RdKafkaTopicPartitionT(..) , rdKafkaTopicPartitionListAdd , newRdKafkaTopicPartitionListT , rdKafkaMessageDestroy , rdKafkaTopicPartitionListSetOffset , rdKafkaTopicPartitionListNew , peekCText ) import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload, readTimestamp) import Kafka.Types (KafkaError(..), PartitionId(..), TopicName(..)) -- | Converts offsets sync policy to integer (the way Kafka understands it): -- -- * @OffsetSyncDisable == -1@ -- -- * @OffsetSyncImmediate == 0@ -- -- * @OffsetSyncInterval ms == ms@ offsetSyncToInt :: OffsetStoreSync -> Int offsetSyncToInt sync = case sync of OffsetSyncDisable -> -1 OffsetSyncImmediate -> 0 OffsetSyncInterval ms -> ms {-# INLINE offsetSyncToInt #-} offsetToInt64 :: PartitionOffset -> Int64 offsetToInt64 o = case o of PartitionOffsetBeginning -> -2 PartitionOffsetEnd -> -1 PartitionOffset off -> off PartitionOffsetStored -> -1000 PartitionOffsetInvalid -> -1001 {-# INLINE offsetToInt64 #-} int64ToOffset :: Int64 -> PartitionOffset int64ToOffset o | o == -2 = PartitionOffsetBeginning | o == -1 = PartitionOffsetEnd | o == -1000 = PartitionOffsetStored | o >= 0 = PartitionOffset o | otherwise = PartitionOffsetInvalid {-# INLINE int64ToOffset #-} fromNativeTopicPartitionList'' :: RdKafkaTopicPartitionListTPtr -> IO [TopicPartition] fromNativeTopicPartitionList'' ptr = withForeignPtr ptr $ \fptr -> fromNativeTopicPartitionList' fptr fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition] fromNativeTopicPartitionList' ppl = if ppl == nullPtr then return [] else peek ppl >>= fromNativeTopicPartitionList fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition] fromNativeTopicPartitionList pl = let count = cnt'RdKafkaTopicPartitionListT pl elems = elems'RdKafkaTopicPartitionListT pl in mapM (peekElemOff elems >=> toPart) [0..(fromIntegral count - 1)] where toPart :: RdKafkaTopicPartitionT -> IO TopicPartition toPart p = do topic <- peekCText $ topic'RdKafkaTopicPartitionT p return TopicPartition { tpTopicName = TopicName topic, tpPartition = PartitionId $ partition'RdKafkaTopicPartitionT p, tpOffset = int64ToOffset $ offset'RdKafkaTopicPartitionT p } toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr toNativeTopicPartitionList ps = do pl <- newRdKafkaTopicPartitionListT (length ps) mapM_ (\p -> do let TopicName tn = tpTopicName p (PartitionId tp) = tpPartition p to = offsetToInt64 $ tpOffset p tnS = Text.unpack tn _ <- rdKafkaTopicPartitionListAdd pl tnS tp rdKafkaTopicPartitionListSetOffset pl tnS tp to) ps return pl toNativeTopicPartitionListNoDispose :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr toNativeTopicPartitionListNoDispose ps = do pl <- rdKafkaTopicPartitionListNew (length ps) mapM_ (\p -> do let TopicName tn = tpTopicName p (PartitionId tp) = tpPartition p to = offsetToInt64 $ tpOffset p tnS = Text.unpack tn _ <- rdKafkaTopicPartitionListAdd pl tnS tp rdKafkaTopicPartitionListSetOffset pl tnS tp to) ps return pl toNativeTopicPartitionList' :: [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr toNativeTopicPartitionList' tps = do let utps = S.toList . S.fromList $ tps pl <- newRdKafkaTopicPartitionListT (length utps) mapM_ (\(TopicName t, PartitionId p) -> rdKafkaTopicPartitionListAdd pl (Text.unpack t) p) utps return pl topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition topicPartitionFromMessage m = let (Offset moff) = crOffset m in TopicPartition (crTopic m) (crPartition m) (PartitionOffset moff) -- | Creates a topic partition message for use with the offset commit message. -- We increment the offset by 1 here because when we commit, the offset is the position -- the consumer reads from to process the next message. topicPartitionFromMessageForCommit :: ConsumerRecord k v -> TopicPartition topicPartitionFromMessageForCommit m = case topicPartitionFromMessage m of (TopicPartition t p (PartitionOffset moff)) -> TopicPartition t p (PartitionOffset $ moff + 1) other -> other toMap :: Ord k => [(k, v)] -> Map k [v] toMap kvs = fromListWith (++) [(k, [v]) | (k, v) <- kvs] fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))) fromMessagePtr ptr = withForeignPtr ptr $ \realPtr -> if realPtr == nullPtr then Left . kafkaRespErr <$> getErrno else do s <- peek realPtr msg <- if err'RdKafkaMessageT s /= RdKafkaRespErrNoError then return . Left . KafkaResponseError $ err'RdKafkaMessageT s else Right <$> mkRecord s rdKafkaMessageDestroy realPtr return msg where mkRecord msg = do topic <- readTopic msg key <- readKey msg payload <- readPayload msg timestamp <- readTimestamp ptr return ConsumerRecord { crTopic = TopicName topic , crPartition = PartitionId $ partition'RdKafkaMessageT msg , crOffset = Offset $ offset'RdKafkaMessageT msg , crTimestamp = timestamp , crKey = key , crValue = payload } offsetCommitToBool :: OffsetCommit -> Bool offsetCommitToBool OffsetCommit = False offsetCommitToBool OffsetCommitAsync = True