module Kafka.Internal.Shared where import Control.Concurrent (forkIO, rtsSupportsBoundThreads) import Control.Exception import Control.Monad (void, when) import qualified Data.ByteString as BS import qualified Data.ByteString.Internal as BSI import Foreign.C.Error import Kafka.Internal.CancellationToken as CToken import Kafka.Internal.RdKafka import Kafka.Internal.Setup import Kafka.Types runEventLoop :: HasKafka a => a -> CancellationToken -> Maybe Timeout -> IO () runEventLoop k ct timeout = when rtsSupportsBoundThreads $ void $ forkIO go where go = do token <- CToken.status ct case token of Running -> pollEvents k timeout >> go Cancelled -> return () pollEvents :: HasKafka a => a -> Maybe Timeout -> IO () pollEvents a tm = let timeout = maybe 0 (\(Timeout ms) -> ms) tm (Kafka k) = getKafka a in void (rdKafkaPoll k timeout) word8PtrToBS :: Int -> Word8Ptr -> IO BS.ByteString word8PtrToBS len ptr = BSI.create len $ \bsptr -> BSI.memcpy bsptr ptr len kafkaRespErr :: Errno -> KafkaError kafkaRespErr (Errno num) = KafkaResponseError $ rdKafkaErrno2err (fromIntegral num) {-# INLINE kafkaRespErr #-} throwOnError :: IO (Maybe String) -> IO () throwOnError action = do m <- action case m of Just e -> throw $ KafkaError e Nothing -> return () hasError :: KafkaError -> Bool hasError err = case err of KafkaResponseError RdKafkaRespErrNoError -> False _ -> True {-# INLINE hasError #-} rdKafkaErrorToEither :: RdKafkaRespErrT -> Either KafkaError () rdKafkaErrorToEither err = case err of RdKafkaRespErrNoError -> Right () _ -> Left (KafkaResponseError err) {-# INLINE rdKafkaErrorToEither #-} kafkaErrorToEither :: KafkaError -> Either KafkaError () kafkaErrorToEither err = case err of KafkaResponseError RdKafkaRespErrNoError -> Right () _ -> Left err {-# INLINE kafkaErrorToEither #-} kafkaErrorToMaybe :: KafkaError -> Maybe KafkaError kafkaErrorToMaybe err = case err of KafkaResponseError RdKafkaRespErrNoError -> Nothing _ -> Just err {-# INLINE kafkaErrorToMaybe #-} maybeToLeft :: Maybe a -> Either a () maybeToLeft = maybe (Right ()) Left {-# INLINE maybeToLeft #-}