{-# LANGUAGE BangPatterns #-}
module Kafka.Consumer.Callbacks
( rebalanceCallback
, offsetCommitCallback
, module X
)
where
import Control.Arrow ((&&&))
import Control.Monad (forM_, void)
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)
import Kafka.Callbacks as X
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'')
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..))
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
import qualified Data.Text as Text
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback callback =
Callback $ \kc@(KafkaConf con _ _) -> rdKafkaConfSetRebalanceCb con (realCb kc)
where
realCb kc k err pl = do
k' <- newForeignPtr_ k
pls <- newForeignPtr_ pl
setRebalanceCallback callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback
offsetCommitCallback callback =
Callback $ \kc@(KafkaConf conf _ _) -> rdKafkaConfSetOffsetCommitCb conf (realCb kc)
where
realCb kc k err pl = do
k' <- newForeignPtr_ k
pls <- fromNativeTopicPartitionList' pl
callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls
redirectPartitionQueue :: Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue (Kafka k) (TopicName t) (PartitionId p) q = do
mpq <- rdKafkaQueueGetPartition k (Text.unpack t) p
case mpq of
Nothing -> return ()
Just pq -> rdKafkaQueueForward pq q
setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
-> KafkaConsumer
-> KafkaError
-> RdKafkaTopicPartitionListTPtr -> IO ()
setRebalanceCallback f k e pls = do
ps <- fromNativeTopicPartitionList'' pls
let assignment = (tpTopicName &&& tpPartition) <$> ps
let (Kafka kptr) = getKafka k
case e of
KafkaResponseError RdKafkaRespErrAssignPartitions -> do
f k (RebalanceBeforeAssign assignment)
void $ rdKafkaAssign kptr pls
mbq <- getRdMsgQueue $ getKafkaConf k
case mbq of
Nothing -> pure ()
Just mq -> do
void $ rdKafkaPausePartitions kptr pls
forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq)
void $ rdKafkaResumePartitions kptr pls
f k (RebalanceAssign assignment)
KafkaResponseError RdKafkaRespErrRevokePartitions -> do
f k (RebalanceBeforeRevoke assignment)
void $ newForeignPtr_ nullPtr >>= rdKafkaAssign kptr
f k (RebalanceRevoke assignment)
x -> error $ "Rebalance: UNKNOWN response: " <> show x