{-# LANGUAGE BangPatterns #-}
module Kafka.Consumer.Callbacks
( rebalanceCallback
, offsetCommitCallback
, module X
)
where
import Control.Arrow ((&&&))
import Control.Monad (forM_, void)
import Data.Monoid ((<>))
import Foreign hiding (void)
import Kafka.Callbacks as X
import Kafka.Consumer.Convert
import Kafka.Consumer.Types
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Internal.Shared
import Kafka.Types
import Control.Concurrent (threadDelay)
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb conf realCb
where
realCb k err pl = do
k' <- newForeignPtr_ k
pls <- newForeignPtr_ pl
setRebalanceCallback callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
offsetCommitCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetOffsetCommitCb conf realCb
where
realCb k err pl = do
k' <- newForeignPtr_ k
pls <- fromNativeTopicPartitionList' pl
callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls
redirectPartitionQueue :: Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue (Kafka k) (TopicName t) (PartitionId p) q = do
mpq <- rdKafkaQueueGetPartition k t p
case mpq of
Nothing -> return ()
Just pq -> rdKafkaQueueForward pq q
setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
-> KafkaConsumer
-> KafkaError
-> RdKafkaTopicPartitionListTPtr -> IO ()
setRebalanceCallback f k e pls = do
ps <- fromNativeTopicPartitionList'' pls
let assignment = (tpTopicName &&& tpPartition) <$> ps
case e of
KafkaResponseError RdKafkaRespErrAssignPartitions -> do
mbq <- getRdMsgQueue $ getKafkaConf k
case mbq of
Nothing -> pure ()
Just mq -> do
forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq)
threadDelay 1000000
f k (RebalanceBeforeAssign assignment)
void $ assign' k pls
f k (RebalanceAssign assignment)
KafkaResponseError RdKafkaRespErrRevokePartitions -> do
f k (RebalanceBeforeRevoke assignment)
void $ assign k []
f k (RebalanceRevoke assignment)
x -> error $ "Rebalance: UNKNOWN response: " <> show x
assign :: KafkaConsumer -> [TopicPartition] -> IO (Maybe KafkaError)
assign (KafkaConsumer (Kafka k) _) ps =
let pl = if null ps
then newForeignPtr_ nullPtr
else toNativeTopicPartitionList ps
er = KafkaResponseError <$> (pl >>= rdKafkaAssign k)
in kafkaErrorToMaybe <$> er
assign' :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
assign' (KafkaConsumer (Kafka k) _) pls =
(kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaAssign k pls