module Kafka.Consumer.Convert
( offsetSyncToInt
, offsetToInt64
, int64ToOffset
, fromNativeTopicPartitionList''
, fromNativeTopicPartitionList'
, fromNativeTopicPartitionList
, toNativeTopicPartitionList
, toNativeTopicPartitionListNoDispose
, toNativeTopicPartitionList'
, topicPartitionFromMessage
, topicPartitionFromMessageForCommit
, toMap
, fromMessagePtr
, offsetCommitToBool
)
where

import           Control.Monad          ((>=>))
import qualified Data.ByteString        as BS
import           Data.Int               (Int64)
import           Data.Map.Strict        (Map, fromListWith)
import qualified Data.Set               as S
import qualified Data.Text              as Text
import           Foreign.Ptr            (Ptr, nullPtr)
import           Foreign.ForeignPtr     (withForeignPtr)
import           Foreign.Storable       (Storable(..))
import           Foreign.C.Error        (getErrno)
import           Kafka.Consumer.Types   (ConsumerRecord(..), TopicPartition(..), Offset(..), OffsetCommit(..), PartitionOffset(..), OffsetStoreSync(..))
import           Kafka.Internal.RdKafka
  ( RdKafkaRespErrT(..)
  , RdKafkaMessageT(..)
  , RdKafkaTopicPartitionListTPtr
  , RdKafkaTopicPartitionListT(..)
  , RdKafkaMessageTPtr
  , RdKafkaTopicPartitionT(..)
  , rdKafkaTopicPartitionListAdd
  , newRdKafkaTopicPartitionListT
  , rdKafkaMessageDestroy
  , rdKafkaTopicPartitionListSetOffset
  , rdKafkaTopicPartitionListNew
  , peekCText
  )
import           Kafka.Internal.Shared  (kafkaRespErr, readTopic, readKey, readPayload, readTimestamp)
import           Kafka.Types            (KafkaError(..), PartitionId(..), TopicName(..))

-- | Converts offsets sync policy to integer (the way Kafka understands it):
--
--     * @OffsetSyncDisable == -1@
--
--     * @OffsetSyncImmediate == 0@
--
--     * @OffsetSyncInterval ms == ms@
offsetSyncToInt :: OffsetStoreSync -> Int
offsetSyncToInt :: OffsetStoreSync -> Int
offsetSyncToInt sync :: OffsetStoreSync
sync =
    case OffsetStoreSync
sync of
        OffsetSyncDisable     -> -1
        OffsetSyncImmediate   -> 0
        OffsetSyncInterval ms :: Int
ms -> Int
ms
{-# INLINE offsetSyncToInt #-}

offsetToInt64 :: PartitionOffset -> Int64
offsetToInt64 :: PartitionOffset -> Int64
offsetToInt64 o :: PartitionOffset
o = case PartitionOffset
o of
    PartitionOffsetBeginning -> -2
    PartitionOffsetEnd       -> -1
    PartitionOffset off :: Int64
off      -> Int64
off
    PartitionOffsetStored    -> -1000
    PartitionOffsetInvalid   -> -1001
{-# INLINE offsetToInt64 #-}

int64ToOffset :: Int64 -> PartitionOffset
int64ToOffset :: Int64 -> PartitionOffset
int64ToOffset o :: Int64
o
    | Int64
o Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== -2    = PartitionOffset
PartitionOffsetBeginning
    | Int64
o Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== -1    = PartitionOffset
PartitionOffsetEnd
    | Int64
o Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== -1000 = PartitionOffset
PartitionOffsetStored
    | Int64
o Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= 0     = Int64 -> PartitionOffset
PartitionOffset Int64
o
    | Bool
otherwise  = PartitionOffset
PartitionOffsetInvalid
{-# INLINE int64ToOffset #-}

fromNativeTopicPartitionList'' :: RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' :: RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ptr :: RdKafkaTopicPartitionListTPtr
ptr =
    RdKafkaTopicPartitionListTPtr
-> (Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition])
-> IO [TopicPartition]
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaTopicPartitionListTPtr
ptr ((Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition])
 -> IO [TopicPartition])
-> (Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition])
-> IO [TopicPartition]
forall a b. (a -> b) -> a -> b
$ \fptr :: Ptr RdKafkaTopicPartitionListT
fptr -> Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' Ptr RdKafkaTopicPartitionListT
fptr

fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' ppl :: Ptr RdKafkaTopicPartitionListT
ppl =
    if Ptr RdKafkaTopicPartitionListT
ppl Ptr RdKafkaTopicPartitionListT
-> Ptr RdKafkaTopicPartitionListT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaTopicPartitionListT
forall a. Ptr a
nullPtr
        then [TopicPartition] -> IO [TopicPartition]
forall (m :: * -> *) a. Monad m => a -> m a
return []
        else Ptr RdKafkaTopicPartitionListT -> IO RdKafkaTopicPartitionListT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaTopicPartitionListT
ppl IO RdKafkaTopicPartitionListT
-> (RdKafkaTopicPartitionListT -> IO [TopicPartition])
-> IO [TopicPartition]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList

fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList pl :: RdKafkaTopicPartitionListT
pl =
    let count :: Int
count = RdKafkaTopicPartitionListT -> Int
cnt'RdKafkaTopicPartitionListT RdKafkaTopicPartitionListT
pl
        elems :: Ptr RdKafkaTopicPartitionT
elems = RdKafkaTopicPartitionListT -> Ptr RdKafkaTopicPartitionT
elems'RdKafkaTopicPartitionListT RdKafkaTopicPartitionListT
pl
    in (Int -> IO TopicPartition) -> [Int] -> IO [TopicPartition]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (Ptr RdKafkaTopicPartitionT -> Int -> IO RdKafkaTopicPartitionT
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr RdKafkaTopicPartitionT
elems (Int -> IO RdKafkaTopicPartitionT)
-> (RdKafkaTopicPartitionT -> IO TopicPartition)
-> Int
-> IO TopicPartition
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> RdKafkaTopicPartitionT -> IO TopicPartition
toPart) [0..(Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
count Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1)]
    where
        toPart :: RdKafkaTopicPartitionT -> IO TopicPartition
        toPart :: RdKafkaTopicPartitionT -> IO TopicPartition
toPart p :: RdKafkaTopicPartitionT
p = do
            Text
topic <- CString -> IO Text
peekCText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaTopicPartitionT -> CString
topic'RdKafkaTopicPartitionT RdKafkaTopicPartitionT
p
            TopicPartition -> IO TopicPartition
forall (m :: * -> *) a. Monad m => a -> m a
return TopicPartition :: TopicName -> PartitionId -> PartitionOffset -> TopicPartition
TopicPartition {
                tpTopicName :: TopicName
tpTopicName = Text -> TopicName
TopicName Text
topic,
                tpPartition :: PartitionId
tpPartition = Int -> PartitionId
PartitionId (Int -> PartitionId) -> Int -> PartitionId
forall a b. (a -> b) -> a -> b
$ RdKafkaTopicPartitionT -> Int
partition'RdKafkaTopicPartitionT RdKafkaTopicPartitionT
p,
                tpOffset :: PartitionOffset
tpOffset    = Int64 -> PartitionOffset
int64ToOffset (Int64 -> PartitionOffset) -> Int64 -> PartitionOffset
forall a b. (a -> b) -> a -> b
$ RdKafkaTopicPartitionT -> Int64
offset'RdKafkaTopicPartitionT RdKafkaTopicPartitionT
p
            }

toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList ps :: [TopicPartition]
ps = do
    RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([TopicPartition] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TopicPartition]
ps)
    (TopicPartition -> IO RdKafkaRespErrT) -> [TopicPartition] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\p :: TopicPartition
p -> do
        let TopicName tn :: Text
tn = TopicPartition -> TopicName
tpTopicName TopicPartition
p
            (PartitionId tp :: Int
tp) = TopicPartition -> PartitionId
tpPartition TopicPartition
p
            to :: Int64
to = PartitionOffset -> Int64
offsetToInt64 (PartitionOffset -> Int64) -> PartitionOffset -> Int64
forall a b. (a -> b) -> a -> b
$ TopicPartition -> PartitionOffset
tpOffset TopicPartition
p
            tnS :: String
tnS = Text -> String
Text.unpack Text
tn 
        RdKafkaTopicPartitionTPtr
_ <- RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp
        RdKafkaTopicPartitionListTPtr
-> String -> Int -> Int64 -> IO RdKafkaRespErrT
rdKafkaTopicPartitionListSetOffset RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp Int64
to) [TopicPartition]
ps
    RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaTopicPartitionListTPtr
pl

toNativeTopicPartitionListNoDispose :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose ps :: [TopicPartition]
ps = do
    RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
rdKafkaTopicPartitionListNew ([TopicPartition] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TopicPartition]
ps)
    (TopicPartition -> IO RdKafkaRespErrT) -> [TopicPartition] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\p :: TopicPartition
p -> do
        let TopicName tn :: Text
tn = TopicPartition -> TopicName
tpTopicName TopicPartition
p
            (PartitionId tp :: Int
tp) = TopicPartition -> PartitionId
tpPartition TopicPartition
p
            to :: Int64
to = PartitionOffset -> Int64
offsetToInt64 (PartitionOffset -> Int64) -> PartitionOffset -> Int64
forall a b. (a -> b) -> a -> b
$ TopicPartition -> PartitionOffset
tpOffset TopicPartition
p
            tnS :: String
tnS = Text -> String
Text.unpack Text
tn 
        RdKafkaTopicPartitionTPtr
_ <- RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp
        RdKafkaTopicPartitionListTPtr
-> String -> Int -> Int64 -> IO RdKafkaRespErrT
rdKafkaTopicPartitionListSetOffset RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp Int64
to) [TopicPartition]
ps
    RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaTopicPartitionListTPtr
pl

toNativeTopicPartitionList' :: [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' :: [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' tps :: [(TopicName, PartitionId)]
tps = do
    let utps :: [(TopicName, PartitionId)]
utps = Set (TopicName, PartitionId) -> [(TopicName, PartitionId)]
forall a. Set a -> [a]
S.toList (Set (TopicName, PartitionId) -> [(TopicName, PartitionId)])
-> ([(TopicName, PartitionId)] -> Set (TopicName, PartitionId))
-> [(TopicName, PartitionId)]
-> [(TopicName, PartitionId)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(TopicName, PartitionId)] -> Set (TopicName, PartitionId)
forall a. Ord a => [a] -> Set a
S.fromList ([(TopicName, PartitionId)] -> [(TopicName, PartitionId)])
-> [(TopicName, PartitionId)] -> [(TopicName, PartitionId)]
forall a b. (a -> b) -> a -> b
$ [(TopicName, PartitionId)]
tps
    RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([(TopicName, PartitionId)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(TopicName, PartitionId)]
utps)
    ((TopicName, PartitionId) -> IO RdKafkaTopicPartitionTPtr)
-> [(TopicName, PartitionId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName t :: Text
t, PartitionId p :: Int
p) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
t) Int
p) [(TopicName, PartitionId)]
utps
    RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaTopicPartitionListTPtr
pl

topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage m :: ConsumerRecord k v
m =
  let (Offset moff :: Int64
moff) = ConsumerRecord k v -> Offset
forall k v. ConsumerRecord k v -> Offset
crOffset ConsumerRecord k v
m
   in TopicName -> PartitionId -> PartitionOffset -> TopicPartition
TopicPartition (ConsumerRecord k v -> TopicName
forall k v. ConsumerRecord k v -> TopicName
crTopic ConsumerRecord k v
m) (ConsumerRecord k v -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
crPartition ConsumerRecord k v
m) (Int64 -> PartitionOffset
PartitionOffset Int64
moff)

-- | Creates a topic partition message for use with the offset commit message.
-- We increment the offset by 1 here because when we commit, the offset is the position
-- the consumer reads from to process the next message.
topicPartitionFromMessageForCommit :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit m :: ConsumerRecord k v
m =
  case ConsumerRecord k v -> TopicPartition
forall k v. ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage ConsumerRecord k v
m of
    (TopicPartition t :: TopicName
t p :: PartitionId
p (PartitionOffset moff :: Int64
moff)) -> TopicName -> PartitionId -> PartitionOffset -> TopicPartition
TopicPartition TopicName
t PartitionId
p (Int64 -> PartitionOffset
PartitionOffset (Int64 -> PartitionOffset) -> Int64 -> PartitionOffset
forall a b. (a -> b) -> a -> b
$ Int64
moff Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ 1)
    other :: TopicPartition
other                                       -> TopicPartition
other

toMap :: Ord k => [(k, v)] -> Map k [v]
toMap :: [(k, v)] -> Map k [v]
toMap kvs :: [(k, v)]
kvs = ([v] -> [v] -> [v]) -> [(k, [v])] -> Map k [v]
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
fromListWith [v] -> [v] -> [v]
forall a. [a] -> [a] -> [a]
(++) [(k
k, [v
v]) | (k :: k
k, v :: v
v) <- [(k, v)]
kvs]

fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
fromMessagePtr :: RdKafkaMessageTPtr
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr ptr :: RdKafkaMessageTPtr
ptr =
    RdKafkaMessageTPtr
-> (Ptr RdKafkaMessageT
    -> IO
         (Either
            KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaMessageTPtr
ptr ((Ptr RdKafkaMessageT
  -> IO
       (Either
          KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
 -> IO
      (Either
         KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> (Ptr RdKafkaMessageT
    -> IO
         (Either
            KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. (a -> b) -> a -> b
$ \realPtr :: Ptr RdKafkaMessageT
realPtr ->
    if Ptr RdKafkaMessageT
realPtr Ptr RdKafkaMessageT -> Ptr RdKafkaMessageT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaMessageT
forall a. Ptr a
nullPtr then KafkaError
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. a -> Either a b
Left (KafkaError
 -> Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> (Errno -> KafkaError)
-> Errno
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Errno -> KafkaError
kafkaRespErr (Errno
 -> Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> IO Errno
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Errno
getErrno
    else do
        RdKafkaMessageT
s <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
realPtr
        Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
msg <- if RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError
                then Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
 -> IO
      (Either
         KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> (RdKafkaRespErrT
    -> Either
         KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> RdKafkaRespErrT
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. a -> Either a b
Left (KafkaError
 -> Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT
 -> IO
      (Either
         KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> RdKafkaRespErrT
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s
                else ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. b -> Either a b
Right (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
 -> Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaMessageT
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mkRecord RdKafkaMessageT
s
        Ptr RdKafkaMessageT -> IO ()
rdKafkaMessageDestroy Ptr RdKafkaMessageT
realPtr
        Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a. Monad m => a -> m a
return Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
msg
    where
        mkRecord :: RdKafkaMessageT
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mkRecord msg :: RdKafkaMessageT
msg = do
            Text
topic     <- RdKafkaMessageT -> IO Text
readTopic RdKafkaMessageT
msg
            Maybe ByteString
key       <- RdKafkaMessageT -> IO (Maybe ByteString)
readKey RdKafkaMessageT
msg
            Maybe ByteString
payload   <- RdKafkaMessageT -> IO (Maybe ByteString)
readPayload RdKafkaMessageT
msg
            Timestamp
timestamp <- RdKafkaMessageTPtr -> IO Timestamp
readTimestamp RdKafkaMessageTPtr
ptr
            ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall (m :: * -> *) a. Monad m => a -> m a
return $WConsumerRecord :: forall k v.
TopicName
-> PartitionId
-> Offset
-> Timestamp
-> k
-> v
-> ConsumerRecord k v
ConsumerRecord
                { crTopic :: TopicName
crTopic     = Text -> TopicName
TopicName Text
topic
                , crPartition :: PartitionId
crPartition = Int -> PartitionId
PartitionId (Int -> PartitionId) -> Int -> PartitionId
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int
partition'RdKafkaMessageT RdKafkaMessageT
msg
                , crOffset :: Offset
crOffset    = Int64 -> Offset
Offset (Int64 -> Offset) -> Int64 -> Offset
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int64
offset'RdKafkaMessageT RdKafkaMessageT
msg
                , crTimestamp :: Timestamp
crTimestamp = Timestamp
timestamp
                , crKey :: Maybe ByteString
crKey       = Maybe ByteString
key
                , crValue :: Maybe ByteString
crValue     = Maybe ByteString
payload
                }

offsetCommitToBool :: OffsetCommit -> Bool
offsetCommitToBool :: OffsetCommit -> Bool
offsetCommitToBool OffsetCommit      = Bool
False
offsetCommitToBool OffsetCommitAsync = Bool
True