{-# LANGUAGE LambdaCase #-}

module Kafka.Internal.Shared
( pollEvents
, word8PtrToBS
, kafkaRespErr
, throwOnError
, hasError
, rdKafkaErrorToEither
, kafkaErrorToEither
, kafkaErrorToMaybe
, maybeToLeft
, readHeaders
, readPayload
, readTopic
, readKey
, readTimestamp
, readBS
)
where

import           Control.Exception        (throw)
import           Control.Monad            (void)
import qualified Data.ByteString          as BS
import qualified Data.ByteString.Internal as BSI
import           Data.Text                (Text)
import qualified Data.Text                as Text
import           Data.Word                (Word8)
import           Foreign.C.Error          (Errno (..))
import           Foreign.ForeignPtr       (newForeignPtr_)
import           Foreign.Marshal.Alloc    (alloca)
import           Foreign.Ptr              (Ptr, nullPtr)
import           Foreign.Storable         (Storable (peek))
import           Kafka.Consumer.Types     (Timestamp (..))
import           Kafka.Internal.RdKafka   (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName, rdKafkaHeaderGetAll, rdKafkaMessageHeaders)
import           Kafka.Internal.Setup     (HasKafka (..), Kafka (..))
import           Kafka.Types              (KafkaError (..), Millis (..), Timeout (..), Headers, headersFromList)

pollEvents :: HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents :: a -> Maybe Timeout -> IO ()
pollEvents a :: a
a tm :: Maybe Timeout
tm =
  let timeout :: Int
timeout = Int -> (Timeout -> Int) -> Maybe Timeout -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe 0 Timeout -> Int
unTimeout Maybe Timeout
tm
      Kafka k :: RdKafkaTPtr
k = a -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka a
a
  in IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (RdKafkaTPtr -> Int -> IO Int
rdKafkaPoll RdKafkaTPtr
k Int
timeout)

word8PtrToBS :: Int -> Word8Ptr -> IO BS.ByteString
word8PtrToBS :: Int -> Word8Ptr -> IO ByteString
word8PtrToBS len :: Int
len ptr :: Word8Ptr
ptr = Int -> (Word8Ptr -> IO ()) -> IO ByteString
BSI.create Int
len ((Word8Ptr -> IO ()) -> IO ByteString)
-> (Word8Ptr -> IO ()) -> IO ByteString
forall a b. (a -> b) -> a -> b
$ \bsptr :: Word8Ptr
bsptr ->
    Word8Ptr -> Word8Ptr -> Int -> IO ()
BSI.memcpy Word8Ptr
bsptr Word8Ptr
ptr Int
len

kafkaRespErr :: Errno -> KafkaError
kafkaRespErr :: Errno -> KafkaError
kafkaRespErr (Errno num :: CInt
num) = RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError) -> RdKafkaRespErrT -> KafkaError
forall a b. (a -> b) -> a -> b
$ Int -> RdKafkaRespErrT
rdKafkaErrno2err (CInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral CInt
num)
{-# INLINE kafkaRespErr #-}

throwOnError :: IO (Maybe Text) -> IO ()
throwOnError :: IO (Maybe Text) -> IO ()
throwOnError action :: IO (Maybe Text)
action = do
    Maybe Text
m <- IO (Maybe Text)
action
    case Maybe Text
m of
        Just e :: Text
e  -> KafkaError -> IO ()
forall a e. Exception e => e -> a
throw (KafkaError -> IO ()) -> KafkaError -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
e
        Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

hasError :: KafkaError -> Bool
hasError :: KafkaError -> Bool
hasError err :: KafkaError
err = case KafkaError
err of
    KafkaResponseError RdKafkaRespErrNoError -> Bool
False
    _                                        -> Bool
True
{-# INLINE hasError #-}

rdKafkaErrorToEither :: RdKafkaRespErrT -> Either KafkaError ()
rdKafkaErrorToEither :: RdKafkaRespErrT -> Either KafkaError ()
rdKafkaErrorToEither err :: RdKafkaRespErrT
err = case RdKafkaRespErrT
err of
    RdKafkaRespErrNoError -> () -> Either KafkaError ()
forall a b. b -> Either a b
Right ()
    _                     -> KafkaError -> Either KafkaError ()
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)
{-# INLINE rdKafkaErrorToEither #-}

kafkaErrorToEither :: KafkaError -> Either KafkaError ()
kafkaErrorToEither :: KafkaError -> Either KafkaError ()
kafkaErrorToEither err :: KafkaError
err = case KafkaError
err of
    KafkaResponseError RdKafkaRespErrNoError -> () -> Either KafkaError ()
forall a b. b -> Either a b
Right ()
    _                                        -> KafkaError -> Either KafkaError ()
forall a b. a -> Either a b
Left KafkaError
err
{-# INLINE kafkaErrorToEither #-}

kafkaErrorToMaybe :: KafkaError -> Maybe KafkaError
kafkaErrorToMaybe :: KafkaError -> Maybe KafkaError
kafkaErrorToMaybe err :: KafkaError
err = case KafkaError
err of
    KafkaResponseError RdKafkaRespErrNoError -> Maybe KafkaError
forall a. Maybe a
Nothing
    _                                        -> KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just KafkaError
err
{-# INLINE kafkaErrorToMaybe #-}

maybeToLeft :: Maybe a -> Either a ()
maybeToLeft :: Maybe a -> Either a ()
maybeToLeft = Either a () -> (a -> Either a ()) -> Maybe a -> Either a ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> Either a ()
forall a b. b -> Either a b
Right ()) a -> Either a ()
forall a b. a -> Either a b
Left
{-# INLINE maybeToLeft #-}

readPayload :: RdKafkaMessageT -> IO (Maybe BS.ByteString)
readPayload :: RdKafkaMessageT -> IO (Maybe ByteString)
readPayload = (RdKafkaMessageT -> Int)
-> (RdKafkaMessageT -> Word8Ptr)
-> RdKafkaMessageT
-> IO (Maybe ByteString)
forall t.
(t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString)
readBS RdKafkaMessageT -> Int
len'RdKafkaMessageT RdKafkaMessageT -> Word8Ptr
payload'RdKafkaMessageT

readTopic :: RdKafkaMessageT -> IO Text
readTopic :: RdKafkaMessageT -> IO Text
readTopic msg :: RdKafkaMessageT
msg = Ptr RdKafkaTopicT -> IO (ForeignPtr RdKafkaTopicT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ (RdKafkaMessageT -> Ptr RdKafkaTopicT
topic'RdKafkaMessageT RdKafkaMessageT
msg) IO (ForeignPtr RdKafkaTopicT)
-> (ForeignPtr RdKafkaTopicT -> IO Text) -> IO Text
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((String -> Text) -> IO String -> IO Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap String -> Text
Text.pack (IO String -> IO Text)
-> (ForeignPtr RdKafkaTopicT -> IO String)
-> ForeignPtr RdKafkaTopicT
-> IO Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ForeignPtr RdKafkaTopicT -> IO String
rdKafkaTopicName)

readKey :: RdKafkaMessageT -> IO (Maybe BSI.ByteString)
readKey :: RdKafkaMessageT -> IO (Maybe ByteString)
readKey = (RdKafkaMessageT -> Int)
-> (RdKafkaMessageT -> Word8Ptr)
-> RdKafkaMessageT
-> IO (Maybe ByteString)
forall t.
(t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString)
readBS RdKafkaMessageT -> Int
keyLen'RdKafkaMessageT RdKafkaMessageT -> Word8Ptr
key'RdKafkaMessageT

readTimestamp :: RdKafkaMessageTPtr -> IO Timestamp
readTimestamp :: RdKafkaMessageTPtr -> IO Timestamp
readTimestamp msg :: RdKafkaMessageTPtr
msg =
  (Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp)
-> (Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp
forall a b. (a -> b) -> a -> b
$ \p :: Ptr RdKafkaTimestampTypeT
p -> do
    ForeignPtr RdKafkaTimestampTypeT
typeP <- Ptr RdKafkaTimestampTypeT -> IO (ForeignPtr RdKafkaTimestampTypeT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTimestampTypeT
p
    Int64
ts <- CInt64T -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt64T -> Int64) -> IO CInt64T -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaMessageTPtr
-> ForeignPtr RdKafkaTimestampTypeT -> IO CInt64T
rdKafkaMessageTimestamp RdKafkaMessageTPtr
msg ForeignPtr RdKafkaTimestampTypeT
typeP
    RdKafkaTimestampTypeT
tsType <- Ptr RdKafkaTimestampTypeT -> IO RdKafkaTimestampTypeT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaTimestampTypeT
p
    Timestamp -> IO Timestamp
forall (m :: * -> *) a. Monad m => a -> m a
return (Timestamp -> IO Timestamp) -> Timestamp -> IO Timestamp
forall a b. (a -> b) -> a -> b
$ case RdKafkaTimestampTypeT
tsType of
               RdKafkaTimestampCreateTime    -> Millis -> Timestamp
CreateTime (Int64 -> Millis
Millis Int64
ts)
               RdKafkaTimestampLogAppendTime -> Millis -> Timestamp
LogAppendTime (Int64 -> Millis
Millis Int64
ts)
               RdKafkaTimestampNotAvailable  -> Timestamp
NoTimestamp


readHeaders :: Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
readHeaders :: Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
readHeaders msg :: Ptr RdKafkaMessageT
msg = do
    (err :: RdKafkaRespErrT
err, headersPtr :: RdKafkaHeadersTPtr
headersPtr) <- Ptr RdKafkaMessageT -> IO (RdKafkaRespErrT, RdKafkaHeadersTPtr)
rdKafkaMessageHeaders Ptr RdKafkaMessageT
msg
    case RdKafkaRespErrT
err of
        RdKafkaRespErrNoent -> Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either RdKafkaRespErrT Headers
 -> IO (Either RdKafkaRespErrT Headers))
-> Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers)
forall a b. (a -> b) -> a -> b
$ Headers -> Either RdKafkaRespErrT Headers
forall a b. b -> Either a b
Right Headers
forall a. Monoid a => a
mempty
        RdKafkaRespErrNoError -> ([(ByteString, ByteString)] -> Headers)
-> Either RdKafkaRespErrT [(ByteString, ByteString)]
-> Either RdKafkaRespErrT Headers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(ByteString, ByteString)] -> Headers
headersFromList (Either RdKafkaRespErrT [(ByteString, ByteString)]
 -> Either RdKafkaRespErrT Headers)
-> IO (Either RdKafkaRespErrT [(ByteString, ByteString)])
-> IO (Either RdKafkaRespErrT Headers)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaHeadersTPtr
-> IO (Either RdKafkaRespErrT [(ByteString, ByteString)])
forall a.
RdKafkaHeadersTPtr -> IO (Either a [(ByteString, ByteString)])
extractHeaders RdKafkaHeadersTPtr
headersPtr
        e :: RdKafkaRespErrT
e -> Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either RdKafkaRespErrT Headers
 -> IO (Either RdKafkaRespErrT Headers))
-> (RdKafkaRespErrT -> Either RdKafkaRespErrT Headers)
-> RdKafkaRespErrT
-> IO (Either RdKafkaRespErrT Headers)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> Either RdKafkaRespErrT Headers
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> IO (Either RdKafkaRespErrT Headers))
-> RdKafkaRespErrT -> IO (Either RdKafkaRespErrT Headers)
forall a b. (a -> b) -> a -> b
$ RdKafkaRespErrT
e
    where extractHeaders :: RdKafkaHeadersTPtr -> IO (Either a [(ByteString, ByteString)])
extractHeaders ptHeaders :: RdKafkaHeadersTPtr
ptHeaders =
            (Ptr CString -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr CString -> IO (Either a [(ByteString, ByteString)]))
 -> IO (Either a [(ByteString, ByteString)]))
-> (Ptr CString -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ \nptr :: Ptr CString
nptr ->
                (Ptr Word8Ptr -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr Word8Ptr -> IO (Either a [(ByteString, ByteString)]))
 -> IO (Either a [(ByteString, ByteString)]))
-> (Ptr Word8Ptr -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ \vptr :: Ptr Word8Ptr
vptr ->
                    (Ptr CSize -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr CSize -> IO (Either a [(ByteString, ByteString)]))
 -> IO (Either a [(ByteString, ByteString)]))
-> (Ptr CSize -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ \szptr :: Ptr CSize
szptr ->
                        let go :: [(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
go acc :: [(ByteString, ByteString)]
acc idx :: CSize
idx = RdKafkaHeadersTPtr
-> CSize
-> Ptr CString
-> Ptr Word8Ptr
-> Ptr CSize
-> IO RdKafkaRespErrT
rdKafkaHeaderGetAll RdKafkaHeadersTPtr
ptHeaders CSize
idx Ptr CString
nptr Ptr Word8Ptr
vptr Ptr CSize
szptr IO RdKafkaRespErrT
-> (RdKafkaRespErrT -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                                RdKafkaRespErrNoent -> Either a [(ByteString, ByteString)]
-> IO (Either a [(ByteString, ByteString)])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either a [(ByteString, ByteString)]
 -> IO (Either a [(ByteString, ByteString)]))
-> Either a [(ByteString, ByteString)]
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString)] -> Either a [(ByteString, ByteString)]
forall a b. b -> Either a b
Right [(ByteString, ByteString)]
acc
                                RdKafkaRespErrNoError -> do
                                    CString
cstr <- Ptr CString -> IO CString
forall a. Storable a => Ptr a -> IO a
peek Ptr CString
nptr
                                    Word8Ptr
wptr <- Ptr Word8Ptr -> IO Word8Ptr
forall a. Storable a => Ptr a -> IO a
peek Ptr Word8Ptr
vptr
                                    CSize
csize <- Ptr CSize -> IO CSize
forall a. Storable a => Ptr a -> IO a
peek Ptr CSize
szptr
                                    ByteString
hn <- CString -> IO ByteString
BS.packCString CString
cstr
                                    ByteString
hv <- Int -> Word8Ptr -> IO ByteString
word8PtrToBS (CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral CSize
csize) Word8Ptr
wptr
                                    [(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
go ((ByteString
hn, ByteString
hv) (ByteString, ByteString)
-> [(ByteString, ByteString)] -> [(ByteString, ByteString)]
forall a. a -> [a] -> [a]
: [(ByteString, ByteString)]
acc) (CSize
idx CSize -> CSize -> CSize
forall a. Num a => a -> a -> a
+ 1)
                                _ -> String -> IO (Either a [(ByteString, ByteString)])
forall a. HasCallStack => String -> a
error "Unexpected error code while extracting headers"
                        in [(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
forall a.
[(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
go [] 0

readBS :: (t -> Int) -> (t -> Ptr Word8) -> t -> IO (Maybe BS.ByteString)
readBS :: (t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString)
readBS flen :: t -> Int
flen fdata :: t -> Word8Ptr
fdata s :: t
s = if t -> Word8Ptr
fdata t
s Word8Ptr -> Word8Ptr -> Bool
forall a. Eq a => a -> a -> Bool
== Word8Ptr
forall a. Ptr a
nullPtr
                        then Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
                        else ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Word8Ptr -> IO ByteString
word8PtrToBS (t -> Int
flen t
s) (t -> Word8Ptr
fdata t
s)