{-# LANGUAGE LambdaCase        #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections     #-}

-----------------------------------------------------------------------------
-- |
-- Module to consume messages from Kafka topics.
-- 
-- Here's an example of code to consume messages from a topic:
-- 
-- @
-- import Control.Exception (bracket)
-- import Control.Monad (replicateM_)
-- import Kafka.Consumer
-- 
-- -- Global consumer properties
-- consumerProps :: 'ConsumerProperties'
-- consumerProps = 'brokersList' ["localhost:9092"]
--              <> 'groupId' ('ConsumerGroupId' "consumer_example_group")
--              <> 'noAutoCommit'
--              <> 'logLevel' 'KafkaLogInfo'
-- 
-- -- Subscription to topics
-- consumerSub :: 'Subscription'
-- consumerSub = 'topics' ['TopicName' "kafka-client-example-topic"]
--            <> 'offsetReset' 'Earliest'
-- 
-- -- Running an example
-- runConsumerExample :: IO ()
-- runConsumerExample = do
--     res <- bracket mkConsumer clConsumer runHandler
--     print res
--     where
--       mkConsumer = 'newConsumer' consumerProps consumerSub
--       clConsumer (Left err) = pure (Left err)
--       clConsumer (Right kc) = (maybe (Right ()) Left) \<$\> 'closeConsumer' kc
--       runHandler (Left err) = pure (Left err)
--       runHandler (Right kc) = processMessages kc
-- 
-- -- Example polling 10 times before stopping
-- processMessages :: 'KafkaConsumer' -> IO (Either 'KafkaError' ())
-- processMessages kafka = do
--     replicateM_ 10 $ do
--       msg <- 'pollMessage' kafka ('Timeout' 1000)
--       putStrLn $ "Message: " <> show msg
--       err <- 'commitAllOffsets' 'OffsetCommit' kafka
--       putStrLn $ "Offsets: " <> maybe "Committed." show err
--     pure $ Right ()
-- @
-----------------------------------------------------------------------------
module Kafka.Consumer
( KafkaConsumer
, module X
, runConsumer
, newConsumer
, assign, assignment, subscription
, pausePartitions, resumePartitions
, committed, position, seek
, pollMessage, pollConsumerEvents
, pollMessageBatch
, commitOffsetMessage, commitAllOffsets, commitPartitionsOffsets
, storeOffsets, storeOffsetMessage
, closeConsumer
-- ReExport Types
, RdKafkaRespErrT (..)
)
where

import           Control.Arrow              (left, (&&&))
import           Control.Concurrent         (forkIO, modifyMVar, rtsSupportsBoundThreads, withMVar)
import           Control.Exception          (bracket)
import           Control.Monad              (forM_, void, when)
import           Control.Monad.IO.Class     (MonadIO (liftIO))
import           Control.Monad.Trans.Except (ExceptT (ExceptT), runExceptT)
import           Data.Bifunctor             (bimap, first)
import qualified Data.ByteString            as BS
import           Data.IORef                 (readIORef, writeIORef)
import qualified Data.Map                   as M
import           Data.Maybe                 (fromMaybe)
import           Data.Monoid                ((<>))
import           Data.Set                   (Set)
import qualified Data.Set                   as Set
import qualified Data.Text                  as Text
import           Foreign                    hiding (void)
import           Kafka.Consumer.Convert     (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
import           Kafka.Consumer.Types       (KafkaConsumer (..))
import           Kafka.Internal.RdKafka     (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssign, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
import           Kafka.Internal.Setup       (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf, Callback(..))
import           Kafka.Internal.Shared      (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)

import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.Subscription       as X
import Kafka.Consumer.Types              as X hiding (KafkaConsumer)
import Kafka.Types                       as X

-- | Runs high-level kafka consumer.
-- A callback provided is expected to call 'pollMessage' when convenient.
{-# DEPRECATED runConsumer "Use 'newConsumer'/'closeConsumer' instead" #-}
runConsumer :: ConsumerProperties
            -> Subscription
            -> (KafkaConsumer -> IO (Either KafkaError a))  -- ^ A callback function to poll and handle messages
            -> IO (Either KafkaError a)
runConsumer :: ConsumerProperties
-> Subscription
-> (KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runConsumer cp :: ConsumerProperties
cp sub :: Subscription
sub f :: KafkaConsumer -> IO (Either KafkaError a)
f =
  IO (Either KafkaError KafkaConsumer)
-> (Either KafkaError KafkaConsumer -> IO (Either KafkaError ()))
-> (Either KafkaError KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either KafkaError KafkaConsumer)
mkConsumer Either KafkaError KafkaConsumer -> IO (Either KafkaError ())
forall (m :: * -> *).
MonadIO m =>
Either KafkaError KafkaConsumer -> m (Either KafkaError ())
clConsumer Either KafkaError KafkaConsumer -> IO (Either KafkaError a)
runHandler
  where
    mkConsumer :: IO (Either KafkaError KafkaConsumer)
mkConsumer = ConsumerProperties
-> Subscription -> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
newConsumer ConsumerProperties
cp Subscription
sub

    clConsumer :: Either KafkaError KafkaConsumer -> m (Either KafkaError ())
clConsumer (Left err :: KafkaError
err) = Either KafkaError () -> m (Either KafkaError ())
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError ()
forall a b. a -> Either a b
Left KafkaError
err)
    clConsumer (Right kc :: KafkaConsumer
kc) = Maybe KafkaError -> Either KafkaError ()
forall a. Maybe a -> Either a ()
maybeToLeft (Maybe KafkaError -> Either KafkaError ())
-> m (Maybe KafkaError) -> m (Either KafkaError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
closeConsumer KafkaConsumer
kc

    runHandler :: Either KafkaError KafkaConsumer -> IO (Either KafkaError a)
runHandler (Left err :: KafkaError
err) = Either KafkaError a -> IO (Either KafkaError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError a
forall a b. a -> Either a b
Left KafkaError
err)
    runHandler (Right kc :: KafkaConsumer
kc) = KafkaConsumer -> IO (Either KafkaError a)
f KafkaConsumer
kc

-- | Create a `KafkaConsumer`. This consumer must be correctly released using 'closeConsumer'.
newConsumer :: MonadIO m
            => ConsumerProperties
            -> Subscription
            -> m (Either KafkaError KafkaConsumer)
newConsumer :: ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
newConsumer props :: ConsumerProperties
props (Subscription ts :: Set TopicName
ts tp :: Map Text Text
tp) = IO (Either KafkaError KafkaConsumer)
-> m (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaConsumer)
 -> m (Either KafkaError KafkaConsumer))
-> IO (Either KafkaError KafkaConsumer)
-> m (Either KafkaError KafkaConsumer)
forall a b. (a -> b) -> a -> b
$ do
  let cp :: ConsumerProperties
cp = case ConsumerProperties -> CallbackPollMode
cpCallbackPollMode ConsumerProperties
props of
            CallbackPollModeAsync -> Callback -> ConsumerProperties
setCallback ((KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback (\_ _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())) ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerProperties
props
            CallbackPollModeSync  -> ConsumerProperties
props
  kc :: KafkaConf
kc@(KafkaConf kc' :: RdKafkaConfTPtr
kc' qref :: IORef (Maybe RdKafkaQueueTPtr)
qref _) <- ConsumerProperties -> IO KafkaConf
newConsumerConf ConsumerProperties
cp
  TopicConf
tp' <- TopicProps -> IO TopicConf
topicConf (Map Text Text -> TopicProps
TopicProps Map Text Text
tp)
  ()
_   <- KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf KafkaConf
kc TopicConf
tp'
  Either Text RdKafkaTPtr
rdk <- RdKafkaTypeT -> RdKafkaConfTPtr -> IO (Either Text RdKafkaTPtr)
newRdKafkaT RdKafkaTypeT
RdKafkaConsumer RdKafkaConfTPtr
kc'
  case Either Text RdKafkaTPtr
rdk of
    Left err :: Text
err   -> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError KafkaConsumer
 -> IO (Either KafkaError KafkaConsumer))
-> (KafkaError -> Either KafkaError KafkaConsumer)
-> KafkaError
-> IO (Either KafkaError KafkaConsumer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> Either KafkaError KafkaConsumer
forall a b. a -> Either a b
Left (KafkaError -> IO (Either KafkaError KafkaConsumer))
-> KafkaError -> IO (Either KafkaError KafkaConsumer)
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
err
    Right rdk' :: RdKafkaTPtr
rdk' -> do
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConsumerProperties -> CallbackPollMode
cpCallbackPollMode ConsumerProperties
props CallbackPollMode -> CallbackPollMode -> Bool
forall a. Eq a => a -> a -> Bool
== CallbackPollMode
CallbackPollModeAsync) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        RdKafkaQueueTPtr
msgq <- RdKafkaTPtr -> IO RdKafkaQueueTPtr
rdKafkaQueueNew RdKafkaTPtr
rdk'
        IORef (Maybe RdKafkaQueueTPtr) -> Maybe RdKafkaQueueTPtr -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe RdKafkaQueueTPtr)
qref (RdKafkaQueueTPtr -> Maybe RdKafkaQueueTPtr
forall a. a -> Maybe a
Just RdKafkaQueueTPtr
msgq)
      let kafka :: KafkaConsumer
kafka = Kafka -> KafkaConf -> KafkaConsumer
KafkaConsumer (RdKafkaTPtr -> Kafka
Kafka RdKafkaTPtr
rdk') KafkaConf
kc
      Maybe KafkaError
redErr <- KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll KafkaConsumer
kafka
      case Maybe KafkaError
redErr of
        Just err :: KafkaError
err -> KafkaConsumer -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
closeConsumer KafkaConsumer
kafka IO (Maybe KafkaError)
-> IO (Either KafkaError KafkaConsumer)
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError KafkaConsumer
forall a b. a -> Either a b
Left KafkaError
err)
        Nothing  -> do
          Maybe KafkaLogLevel -> (KafkaLogLevel -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ConsumerProperties -> Maybe KafkaLogLevel
cpLogLevel ConsumerProperties
cp) (KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel KafkaConsumer
kafka)
          Maybe KafkaError
sub <- KafkaConsumer -> Set TopicName -> IO (Maybe KafkaError)
subscribe KafkaConsumer
kafka Set TopicName
ts
          case Maybe KafkaError
sub of
            Nothing  -> (Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConsumerProperties -> CallbackPollMode
cpCallbackPollMode ConsumerProperties
props CallbackPollMode -> CallbackPollMode -> Bool
forall a. Eq a => a -> a -> Bool
== CallbackPollMode
CallbackPollModeAsync) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
              KafkaConsumer -> Maybe Timeout -> IO ()
runConsumerLoop KafkaConsumer
kafka (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 100)) IO ()
-> IO (Either KafkaError KafkaConsumer)
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaConsumer -> Either KafkaError KafkaConsumer
forall a b. b -> Either a b
Right KafkaConsumer
kafka)
            Just err :: KafkaError
err -> KafkaConsumer -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
closeConsumer KafkaConsumer
kafka IO (Maybe KafkaError)
-> IO (Either KafkaError KafkaConsumer)
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError KafkaConsumer
forall a b. a -> Either a b
Left KafkaError
err)

-- | Polls a single message
pollMessage :: MonadIO m
            => KafkaConsumer
            -> Timeout -- ^ the timeout, in milliseconds
            -> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))) -- ^ Left on error or timeout, right for success
pollMessage :: KafkaConsumer
-> Timeout
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage c :: KafkaConsumer
c@(KafkaConsumer _ (KafkaConf _ qr :: IORef (Maybe RdKafkaQueueTPtr)
qr _)) (Timeout ms :: Int
ms) = IO
  (Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
 -> m (Either
         KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. (a -> b) -> a -> b
$ do
  Maybe RdKafkaQueueTPtr
mbq <- IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
qr
  case Maybe RdKafkaQueueTPtr
mbq of
    Nothing -> RdKafkaTPtr -> Int -> IO RdKafkaMessageTPtr
rdKafkaConsumerPoll (KafkaConsumer -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getRdKafka KafkaConsumer
c) Int
ms IO RdKafkaMessageTPtr
-> (RdKafkaMessageTPtr
    -> IO
         (Either
            KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaMessageTPtr
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr
    Just q :: RdKafkaQueueTPtr
q  -> RdKafkaQueueTPtr -> Int -> IO RdKafkaMessageTPtr
rdKafkaConsumeQueue RdKafkaQueueTPtr
q (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
ms) IO RdKafkaMessageTPtr
-> (RdKafkaMessageTPtr
    -> IO
         (Either
            KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaMessageTPtr
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr

-- | Polls up to 'BatchSize' messages.
-- Unlike 'pollMessage' this function does not return usual "timeout" errors.
-- An empty batch is returned when there are no messages available.
--
-- This API is not available when 'CallbackPollMode' is set to 'CallbackPollModeSync'.
pollMessageBatch :: MonadIO m
                 => KafkaConsumer
                 -> Timeout
                 -> BatchSize
                 -> m [Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))]
pollMessageBatch :: KafkaConsumer
-> Timeout
-> BatchSize
-> m [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
pollMessageBatch c :: KafkaConsumer
c@(KafkaConsumer _ (KafkaConf _ qr :: IORef (Maybe RdKafkaQueueTPtr)
qr _)) (Timeout ms :: Int
ms) (BatchSize b :: Int
b) = IO
  [Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> m [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   [Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
 -> m [Either
         KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))])
-> IO
     [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> m [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall a b. (a -> b) -> a -> b
$ do
  KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents KafkaConsumer
c Maybe Timeout
forall a. Maybe a
Nothing
  Maybe RdKafkaQueueTPtr
mbq <- IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
qr
  case Maybe RdKafkaQueueTPtr
mbq of
    Nothing -> [Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> IO
     [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *) a. Monad m => a -> m a
return [KafkaError
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. a -> Either a b
Left (KafkaError
 -> Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> KafkaError
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."]
    Just q :: RdKafkaQueueTPtr
q  -> RdKafkaQueueTPtr -> Int -> Int -> IO [RdKafkaMessageTPtr]
rdKafkaConsumeBatchQueue RdKafkaQueueTPtr
q Int
ms Int
b IO [RdKafkaMessageTPtr]
-> ([RdKafkaMessageTPtr]
    -> IO
         [Either
            KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))])
-> IO
     [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (RdKafkaMessageTPtr
 -> IO
      (Either
         KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> [RdKafkaMessageTPtr]
-> IO
     [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaMessageTPtr
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr

-- | Commit message's offset on broker for the message's partition.
commitOffsetMessage :: MonadIO m
                    => OffsetCommit
                    -> KafkaConsumer
                    -> ConsumerRecord k v
                    -> m (Maybe KafkaError)
commitOffsetMessage :: OffsetCommit
-> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
commitOffsetMessage o :: OffsetCommit
o k :: KafkaConsumer
k m :: ConsumerRecord k v
m =
  IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList [ConsumerRecord k v -> TopicPartition
forall k v. ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit ConsumerRecord k v
m] IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets OffsetCommit
o KafkaConsumer
k

-- | Stores message's offset locally for the message's partition.
storeOffsetMessage :: MonadIO m
                   => KafkaConsumer
                   -> ConsumerRecord k v
                   -> m (Maybe KafkaError)
storeOffsetMessage :: KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
storeOffsetMessage k :: KafkaConsumer
k m :: ConsumerRecord k v
m =
  IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose [ConsumerRecord k v -> TopicPartition
forall k v. ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit ConsumerRecord k v
m] IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= KafkaConsumer
-> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore KafkaConsumer
k

-- | Stores offsets locally
storeOffsets :: MonadIO m
             => KafkaConsumer
             -> [TopicPartition]
             -> m (Maybe KafkaError)
storeOffsets :: KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
storeOffsets k :: KafkaConsumer
k ps :: [TopicPartition]
ps =
  IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose [TopicPartition]
ps IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= KafkaConsumer
-> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore KafkaConsumer
k

-- | Commit offsets for all currently assigned partitions.
commitAllOffsets :: MonadIO m
                 => OffsetCommit
                 -> KafkaConsumer
                 -> m (Maybe KafkaError)
commitAllOffsets :: OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
commitAllOffsets o :: OffsetCommit
o k :: KafkaConsumer
k =
  IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ Ptr RdKafkaTopicPartitionListT -> IO RdKafkaTopicPartitionListTPtr
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTopicPartitionListT
forall a. Ptr a
nullPtr IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets OffsetCommit
o KafkaConsumer
k

-- | Commit offsets for all currently assigned partitions.
commitPartitionsOffsets :: MonadIO m
                 => OffsetCommit
                 -> KafkaConsumer
                 -> [TopicPartition]
                 -> m (Maybe KafkaError)
commitPartitionsOffsets :: OffsetCommit
-> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
commitPartitionsOffsets o :: OffsetCommit
o k :: KafkaConsumer
k ps :: [TopicPartition]
ps =
  IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList [TopicPartition]
ps IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets OffsetCommit
o KafkaConsumer
k

-- | Assigns the consumer to consume from the given topics, partitions,
-- and offsets.
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
assign :: KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
assign (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ps :: [TopicPartition]
ps = IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ do
  RdKafkaTopicPartitionListTPtr
tps <- [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList [TopicPartition]
ps
  KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaAssign RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
tps

-- | Returns current consumer's assignment
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (M.Map TopicName [PartitionId]))
assignment :: KafkaConsumer
-> m (Either KafkaError (Map TopicName [PartitionId]))
assignment (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) = IO (Either KafkaError (Map TopicName [PartitionId]))
-> m (Either KafkaError (Map TopicName [PartitionId]))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError (Map TopicName [PartitionId]))
 -> m (Either KafkaError (Map TopicName [PartitionId])))
-> IO (Either KafkaError (Map TopicName [PartitionId]))
-> m (Either KafkaError (Map TopicName [PartitionId]))
forall a b. (a -> b) -> a -> b
$ do
  Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl <- RdKafkaTPtr
-> IO (Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr)
rdKafkaAssignment RdKafkaTPtr
k
  Either KafkaError [TopicPartition]
tps <- (RdKafkaTopicPartitionListTPtr -> IO [TopicPartition])
-> Either KafkaError RdKafkaTopicPartitionListTPtr
-> IO (Either KafkaError [TopicPartition])
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
-> Either KafkaError RdKafkaTopicPartitionListTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl)
  Either KafkaError (Map TopicName [PartitionId])
-> IO (Either KafkaError (Map TopicName [PartitionId]))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError (Map TopicName [PartitionId])
 -> IO (Either KafkaError (Map TopicName [PartitionId])))
-> Either KafkaError (Map TopicName [PartitionId])
-> IO (Either KafkaError (Map TopicName [PartitionId]))
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> Map TopicName [PartitionId]
tpMap ([TopicPartition] -> Map TopicName [PartitionId])
-> Either KafkaError [TopicPartition]
-> Either KafkaError (Map TopicName [PartitionId])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either KafkaError [TopicPartition]
tps
  where
    tpMap :: [TopicPartition] -> Map TopicName [PartitionId]
tpMap ts :: [TopicPartition]
ts = [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall k v. Ord k => [(k, v)] -> Map k [v]
toMap ([(TopicName, PartitionId)] -> Map TopicName [PartitionId])
-> [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall a b. (a -> b) -> a -> b
$ (TopicPartition -> TopicName
tpTopicName (TopicPartition -> TopicName)
-> (TopicPartition -> PartitionId)
-> TopicPartition
-> (TopicName, PartitionId)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& TopicPartition -> PartitionId
tpPartition) (TopicPartition -> (TopicName, PartitionId))
-> [TopicPartition] -> [(TopicName, PartitionId)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition]
ts

-- | Returns current consumer's subscription
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
subscription :: KafkaConsumer
-> m (Either KafkaError [(TopicName, SubscribedPartitions)])
subscription (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) = IO (Either KafkaError [(TopicName, SubscribedPartitions)])
-> m (Either KafkaError [(TopicName, SubscribedPartitions)])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [(TopicName, SubscribedPartitions)])
 -> m (Either KafkaError [(TopicName, SubscribedPartitions)]))
-> IO (Either KafkaError [(TopicName, SubscribedPartitions)])
-> m (Either KafkaError [(TopicName, SubscribedPartitions)])
forall a b. (a -> b) -> a -> b
$ do
  Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl <- RdKafkaTPtr
-> IO (Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr)
rdKafkaSubscription RdKafkaTPtr
k
  Either KafkaError [TopicPartition]
tps <- (RdKafkaTopicPartitionListTPtr -> IO [TopicPartition])
-> Either KafkaError RdKafkaTopicPartitionListTPtr
-> IO (Either KafkaError [TopicPartition])
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
-> Either KafkaError RdKafkaTopicPartitionListTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl)
  Either KafkaError [(TopicName, SubscribedPartitions)]
-> IO (Either KafkaError [(TopicName, SubscribedPartitions)])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [(TopicName, SubscribedPartitions)]
 -> IO (Either KafkaError [(TopicName, SubscribedPartitions)]))
-> Either KafkaError [(TopicName, SubscribedPartitions)]
-> IO (Either KafkaError [(TopicName, SubscribedPartitions)])
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> [(TopicName, SubscribedPartitions)]
toSub ([TopicPartition] -> [(TopicName, SubscribedPartitions)])
-> Either KafkaError [TopicPartition]
-> Either KafkaError [(TopicName, SubscribedPartitions)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either KafkaError [TopicPartition]
tps
  where
    toSub :: [TopicPartition] -> [(TopicName, SubscribedPartitions)]
toSub ts :: [TopicPartition]
ts = Map TopicName SubscribedPartitions
-> [(TopicName, SubscribedPartitions)]
forall k a. Map k a -> [(k, a)]
M.toList (Map TopicName SubscribedPartitions
 -> [(TopicName, SubscribedPartitions)])
-> Map TopicName SubscribedPartitions
-> [(TopicName, SubscribedPartitions)]
forall a b. (a -> b) -> a -> b
$ [PartitionId] -> SubscribedPartitions
subParts ([PartitionId] -> SubscribedPartitions)
-> Map TopicName [PartitionId]
-> Map TopicName SubscribedPartitions
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition] -> Map TopicName [PartitionId]
tpMap [TopicPartition]
ts
    tpMap :: [TopicPartition] -> Map TopicName [PartitionId]
tpMap ts :: [TopicPartition]
ts = [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall k v. Ord k => [(k, v)] -> Map k [v]
toMap ([(TopicName, PartitionId)] -> Map TopicName [PartitionId])
-> [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall a b. (a -> b) -> a -> b
$ (TopicPartition -> TopicName
tpTopicName (TopicPartition -> TopicName)
-> (TopicPartition -> PartitionId)
-> TopicPartition
-> (TopicName, PartitionId)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& TopicPartition -> PartitionId
tpPartition) (TopicPartition -> (TopicName, PartitionId))
-> [TopicPartition] -> [(TopicName, PartitionId)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition]
ts
    subParts :: [PartitionId] -> SubscribedPartitions
subParts [PartitionId (-1)] = SubscribedPartitions
SubscribedPartitionsAll
    subParts ps :: [PartitionId]
ps                 = [PartitionId] -> SubscribedPartitions
SubscribedPartitions [PartitionId]
ps

-- | Pauses specified partitions on the current consumer.
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
pausePartitions :: KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
pausePartitions (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ps :: [(TopicName, PartitionId)]
ps = IO KafkaError -> m KafkaError
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO KafkaError -> m KafkaError) -> IO KafkaError -> m KafkaError
forall a b. (a -> b) -> a -> b
$ do
  RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([(TopicName, PartitionId)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(TopicName, PartitionId)]
ps)
  ((TopicName, PartitionId) -> IO RdKafkaTopicPartitionTPtr)
-> [(TopicName, PartitionId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName topicName :: Text
topicName, PartitionId partitionId :: Int
partitionId) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
topicName) Int
partitionId) [(TopicName, PartitionId)]
ps
  RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError)
-> IO RdKafkaRespErrT -> IO KafkaError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaPausePartitions RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl

-- | Resumes specified partitions on the current consumer.
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
resumePartitions :: KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
resumePartitions (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ps :: [(TopicName, PartitionId)]
ps = IO KafkaError -> m KafkaError
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO KafkaError -> m KafkaError) -> IO KafkaError -> m KafkaError
forall a b. (a -> b) -> a -> b
$ do
  RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([(TopicName, PartitionId)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(TopicName, PartitionId)]
ps)
  ((TopicName, PartitionId) -> IO RdKafkaTopicPartitionTPtr)
-> [(TopicName, PartitionId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName topicName :: Text
topicName, PartitionId partitionId :: Int
partitionId) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
topicName) Int
partitionId) [(TopicName, PartitionId)]
ps
  RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError)
-> IO RdKafkaRespErrT -> IO KafkaError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaResumePartitions RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl

-- | Seek a particular offset for each provided 'TopicPartition'
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
seek :: KafkaConsumer
-> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
seek (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) (Timeout timeout :: Int
timeout) tps :: [TopicPartition]
tps = IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$
  (KafkaError -> Maybe KafkaError)
-> (() -> Maybe KafkaError)
-> Either KafkaError ()
-> Maybe KafkaError
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just (Maybe KafkaError -> () -> Maybe KafkaError
forall a b. a -> b -> a
const Maybe KafkaError
forall a. Maybe a
Nothing) (Either KafkaError () -> Maybe KafkaError)
-> IO (Either KafkaError ()) -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Either KafkaError ())
seekAll
  where
    seekAll :: IO (Either KafkaError ())
seekAll = ExceptT KafkaError IO () -> IO (Either KafkaError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT KafkaError IO () -> IO (Either KafkaError ()))
-> ExceptT KafkaError IO () -> IO (Either KafkaError ())
forall a b. (a -> b) -> a -> b
$ do
      [(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
tr <- (TopicPartition
 -> ExceptT
      KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> [TopicPartition]
-> ExceptT
     KafkaError IO [(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (IO
  (Either
     KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> ExceptT
     KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO
   (Either
      KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
 -> ExceptT
      KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> (TopicPartition
    -> IO
         (Either
            KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)))
-> TopicPartition
-> ExceptT
     KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TopicPartition
-> IO
     (Either
        KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
topicPair) [TopicPartition]
tps
      ((RdKafkaTopicTPtr, PartitionId, PartitionOffset)
 -> ExceptT KafkaError IO ())
-> [(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
-> ExceptT KafkaError IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(kt :: RdKafkaTopicTPtr
kt, p :: PartitionId
p, o :: PartitionOffset
o) -> IO (Either KafkaError ()) -> ExceptT KafkaError IO ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (RdKafkaTopicTPtr
-> PartitionId -> PartitionOffset -> IO (Either KafkaError ())
rdSeek RdKafkaTopicTPtr
kt PartitionId
p PartitionOffset
o)) [(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
tr

    rdSeek :: RdKafkaTopicTPtr
-> PartitionId -> PartitionOffset -> IO (Either KafkaError ())
rdSeek kt :: RdKafkaTopicTPtr
kt (PartitionId p :: Int
p) o :: PartitionOffset
o =
      RdKafkaRespErrT -> Either KafkaError ()
rdKafkaErrorToEither (RdKafkaRespErrT -> Either KafkaError ())
-> IO RdKafkaRespErrT -> IO (Either KafkaError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTopicTPtr -> Int32 -> Int64 -> Int -> IO RdKafkaRespErrT
rdKafkaSeek RdKafkaTopicTPtr
kt (Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p) (PartitionOffset -> Int64
offsetToInt64 PartitionOffset
o) Int
timeout

    topicPair :: TopicPartition
-> IO
     (Either
        KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
topicPair tp :: TopicPartition
tp = do
      let (TopicName tn :: Text
tn) = TopicPartition -> TopicName
tpTopicName TopicPartition
tp
      Either String RdKafkaTopicTPtr
nt <- RdKafkaTPtr
-> String
-> Maybe RdKafkaTopicConfTPtr
-> IO (Either String RdKafkaTopicTPtr)
newRdKafkaTopicT RdKafkaTPtr
k (Text -> String
Text.unpack Text
tn) Maybe RdKafkaTopicConfTPtr
forall a. Maybe a
Nothing
      Either KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
-> IO
     (Either
        KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
 -> IO
      (Either
         KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)))
-> Either
     KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
-> IO
     (Either
        KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
forall a b. (a -> b) -> a -> b
$ (Text -> KafkaError)
-> (RdKafkaTopicTPtr
    -> (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> Either Text RdKafkaTopicTPtr
-> Either
     KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap Text -> KafkaError
KafkaError (,TopicPartition -> PartitionId
tpPartition TopicPartition
tp, TopicPartition -> PartitionOffset
tpOffset TopicPartition
tp) ((String -> Text)
-> Either String RdKafkaTopicTPtr -> Either Text RdKafkaTopicTPtr
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first String -> Text
Text.pack Either String RdKafkaTopicTPtr
nt)

-- | Retrieve committed offsets for topics+partitions.
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
committed :: KafkaConsumer
-> Timeout
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
committed (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) (Timeout timeout :: Int
timeout) tps :: [(TopicName, PartitionId)]
tps = IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [TopicPartition])
 -> m (Either KafkaError [TopicPartition]))
-> IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ do
  RdKafkaTopicPartitionListTPtr
ntps <- [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' [(TopicName, PartitionId)]
tps
  RdKafkaRespErrT
res <- RdKafkaTPtr
-> RdKafkaTopicPartitionListTPtr -> Int -> IO RdKafkaRespErrT
rdKafkaCommitted RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
ntps Int
timeout
  case RdKafkaRespErrT
res of
    RdKafkaRespErrNoError -> [TopicPartition] -> Either KafkaError [TopicPartition]
forall a b. b -> Either a b
Right ([TopicPartition] -> Either KafkaError [TopicPartition])
-> IO [TopicPartition] -> IO (Either KafkaError [TopicPartition])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' RdKafkaTopicPartitionListTPtr
ntps
    err :: RdKafkaRespErrT
err                   -> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [TopicPartition]
 -> IO (Either KafkaError [TopicPartition]))
-> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError [TopicPartition]
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)

-- | Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer.
-- If the current consumer hasn't received any messages for a given partition, 'PartitionOffsetInvalid' is returned.
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
position :: KafkaConsumer
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
position (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) tps :: [(TopicName, PartitionId)]
tps = IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [TopicPartition])
 -> m (Either KafkaError [TopicPartition]))
-> IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ do
  RdKafkaTopicPartitionListTPtr
ntps <- [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' [(TopicName, PartitionId)]
tps
  RdKafkaRespErrT
res <- RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaPosition RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
ntps
  case RdKafkaRespErrT
res of
    RdKafkaRespErrNoError -> [TopicPartition] -> Either KafkaError [TopicPartition]
forall a b. b -> Either a b
Right ([TopicPartition] -> Either KafkaError [TopicPartition])
-> IO [TopicPartition] -> IO (Either KafkaError [TopicPartition])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' RdKafkaTopicPartitionListTPtr
ntps
    err :: RdKafkaRespErrT
err                   -> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [TopicPartition]
 -> IO (Either KafkaError [TopicPartition]))
-> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError [TopicPartition]
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)

-- | Polls the provided kafka consumer for events.
--
-- Events will cause application provided callbacks to be called.
--
-- The 'Timeout' argument specifies the maximum amount of time
-- (in milliseconds) that the call will block waiting for events.
--
-- This function is called on each 'pollMessage' and, if runtime allows
-- multi threading, it is called periodically in a separate thread
-- to ensure the callbacks are handled ASAP.
--
-- There is no particular need to call this function manually
-- unless some special cases in a single-threaded environment
-- when polling for events on each 'pollMessage' is not
-- frequent enough.
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents k :: KafkaConsumer
k timeout :: Maybe Timeout
timeout =
  IO CallbackPollStatus -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO CallbackPollStatus -> IO ())
-> (IO () -> IO CallbackPollStatus) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled KafkaConsumer
k (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' KafkaConsumer
k Maybe Timeout
timeout

-- | Closes the consumer.
-- 
-- See 'newConsumer'
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
closeConsumer :: KafkaConsumer -> m (Maybe KafkaError)
closeConsumer (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) (KafkaConf _ qr :: IORef (Maybe RdKafkaQueueTPtr)
qr statusVar :: MVar CallbackPollStatus
statusVar)) = IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$
  -- because closing the consumer will raise callbacks,
  -- prevent the async loop from doing it at the same time
  MVar CallbackPollStatus
-> (CallbackPollStatus
    -> IO (CallbackPollStatus, Maybe KafkaError))
-> IO (Maybe KafkaError)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar CallbackPollStatus
statusVar ((CallbackPollStatus -> IO (CallbackPollStatus, Maybe KafkaError))
 -> IO (Maybe KafkaError))
-> (CallbackPollStatus
    -> IO (CallbackPollStatus, Maybe KafkaError))
-> IO (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ \_ -> do
    -- librdkafka says:
    --   Prior to destroying the client instance, loose your reference to the
    --   background queue by calling rd_kafka_queue_destroy()
    IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
qr IO (Maybe RdKafkaQueueTPtr)
-> (Maybe RdKafkaQueueTPtr -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (RdKafkaQueueTPtr -> IO ()) -> Maybe RdKafkaQueueTPtr -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ RdKafkaQueueTPtr -> IO ()
rdKafkaQueueDestroy
    Maybe KafkaError
res <- KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> IO RdKafkaRespErrT
rdKafkaConsumerClose RdKafkaTPtr
k
    (CallbackPollStatus, Maybe KafkaError)
-> IO (CallbackPollStatus, Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CallbackPollStatus
CallbackPollDisabled, Maybe KafkaError
res)
-----------------------------------------------------------------------------
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf ConsumerProperties {cpProps :: ConsumerProperties -> Map Text Text
cpProps = Map Text Text
m, cpCallbacks :: ConsumerProperties -> [Callback]
cpCallbacks = [Callback]
cbs} = do
  KafkaConf
conf <- KafkaProps -> IO KafkaConf
kafkaConf (Map Text Text -> KafkaProps
KafkaProps Map Text Text
m)
  [Callback] -> (Callback -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Callback]
cbs (\(Callback setCb :: KafkaConf -> IO ()
setCb) -> KafkaConf -> IO ()
setCb KafkaConf
conf)
  KafkaConf -> IO KafkaConf
forall (m :: * -> *) a. Monad m => a -> m a
return KafkaConf
conf

-- | Subscribes to a given list of topics.
--
-- Wildcard (regex) topics are supported by the /librdkafka/ assignor:
-- any topic name in the topics list that is prefixed with @^@ will
-- be regex-matched to the full list of topics in the cluster and matching
-- topics will be added to the subscription list.
subscribe :: KafkaConsumer -> Set TopicName -> IO (Maybe KafkaError)
subscribe :: KafkaConsumer -> Set TopicName -> IO (Maybe KafkaError)
subscribe (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ts :: Set TopicName
ts = do
    RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT (Set TopicName -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Set TopicName
ts)
    (TopicName -> IO RdKafkaTopicPartitionTPtr) -> [TopicName] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName t :: Text
t) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
t) (-1)) (Set TopicName -> [TopicName]
forall a. Set a -> [a]
Set.toList Set TopicName
ts)
    KafkaError
res <- RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError)
-> IO RdKafkaRespErrT -> IO KafkaError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaSubscribe RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl
    Maybe KafkaError -> IO (Maybe KafkaError)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe KafkaError -> IO (Maybe KafkaError))
-> Maybe KafkaError -> IO (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ KafkaError -> Maybe KafkaError
kafkaErrorToMaybe KafkaError
res

setDefaultTopicConf :: KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf :: KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf (KafkaConf kc :: RdKafkaConfTPtr
kc _ _) (TopicConf tc :: RdKafkaTopicConfTPtr
tc) =
    RdKafkaTopicConfTPtr -> IO RdKafkaTopicConfTPtr
rdKafkaTopicConfDup RdKafkaTopicConfTPtr
tc IO RdKafkaTopicConfTPtr -> (RdKafkaTopicConfTPtr -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaConfTPtr -> RdKafkaTopicConfTPtr -> IO ()
rdKafkaConfSetDefaultTopicConf RdKafkaConfTPtr
kc

commitOffsets :: OffsetCommit -> KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsets :: OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets o :: OffsetCommit
o (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) pl :: RdKafkaTopicPartitionListTPtr
pl =
    KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr
-> RdKafkaTopicPartitionListTPtr -> Bool -> IO RdKafkaRespErrT
rdKafkaCommit RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl (OffsetCommit -> Bool
offsetCommitToBool OffsetCommit
o)

commitOffsetsStore :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore :: KafkaConsumer
-> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) pl :: RdKafkaTopicPartitionListTPtr
pl =
    KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaOffsetsStore RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl

setConsumerLogLevel :: KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel :: KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) level :: KafkaLogLevel
level =
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RdKafkaTPtr -> Int -> IO ()
rdKafkaSetLogLevel RdKafkaTPtr
k (KafkaLogLevel -> Int
forall a. Enum a => a -> Int
fromEnum KafkaLogLevel
level)

redirectCallbacksPoll :: KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll :: KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) =
  KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> IO RdKafkaRespErrT
rdKafkaPollSetConsumer RdKafkaTPtr
k

runConsumerLoop :: KafkaConsumer -> Maybe Timeout -> IO ()
runConsumerLoop :: KafkaConsumer -> Maybe Timeout -> IO ()
runConsumerLoop k :: KafkaConsumer
k timeout :: Maybe Timeout
timeout =
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
rtsSupportsBoundThreads (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO IO ()
go
  where
    go :: IO ()
go = do
      CallbackPollStatus
st <- KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled KafkaConsumer
k (KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' KafkaConsumer
k Maybe Timeout
timeout)
      case CallbackPollStatus
st of
        CallbackPollEnabled  -> IO ()
go
        CallbackPollDisabled -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled k :: KafkaConsumer
k f :: IO ()
f = do
  let statusVar :: MVar CallbackPollStatus
statusVar = KafkaConf -> MVar CallbackPollStatus
kcfgCallbackPollStatus (KafkaConsumer -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf KafkaConsumer
k)
  MVar CallbackPollStatus
-> (CallbackPollStatus -> IO CallbackPollStatus)
-> IO CallbackPollStatus
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar CallbackPollStatus
statusVar ((CallbackPollStatus -> IO CallbackPollStatus)
 -> IO CallbackPollStatus)
-> (CallbackPollStatus -> IO CallbackPollStatus)
-> IO CallbackPollStatus
forall a b. (a -> b) -> a -> b
$ \case
   CallbackPollEnabled  -> IO ()
f IO () -> IO CallbackPollStatus -> IO CallbackPollStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> CallbackPollStatus -> IO CallbackPollStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure CallbackPollStatus
CallbackPollEnabled
   CallbackPollDisabled -> CallbackPollStatus -> IO CallbackPollStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure CallbackPollStatus
CallbackPollDisabled

pollConsumerEvents' :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' k :: KafkaConsumer
k timeout :: Maybe Timeout
timeout =
  let (Timeout tm :: Int
tm) = Timeout -> Maybe Timeout -> Timeout
forall a. a -> Maybe a -> a
fromMaybe (Int -> Timeout
Timeout 0) Maybe Timeout
timeout
  in IO RdKafkaMessageTPtr -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaMessageTPtr -> IO ()) -> IO RdKafkaMessageTPtr -> IO ()
forall a b. (a -> b) -> a -> b
$ RdKafkaTPtr -> Int -> IO RdKafkaMessageTPtr
rdKafkaConsumerPoll (KafkaConsumer -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getRdKafka KafkaConsumer
k) Int
tm