{-# LANGUAGE BangPatterns #-}
module Kafka.Consumer.Callbacks
( rebalanceCallback
, offsetCommitCallback
, module X
)
where

import Control.Arrow          ((&&&))
import Control.Monad          (forM_, void)
import Foreign.ForeignPtr     (newForeignPtr_)
import Foreign.Ptr            (nullPtr)
import Kafka.Callbacks        as X
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'')
import Kafka.Consumer.Types   (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup   (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..))
import Kafka.Types            (KafkaError (..), PartitionId (..), TopicName (..))

import qualified Data.Text as Text

-- | Sets a callback that is called when rebalance is needed.
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback callback :: KafkaConsumer -> RebalanceEvent -> IO ()
callback =
  (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc@(KafkaConf con :: RdKafkaConfTPtr
con _ _) -> RdKafkaConfTPtr -> RdRebalanceCallback -> IO ()
rdKafkaConfSetRebalanceCb RdKafkaConfTPtr
con (KafkaConf -> RdRebalanceCallback
realCb KafkaConf
kc)
  where
    realCb :: KafkaConf -> RdRebalanceCallback
realCb kc :: KafkaConf
kc k :: Ptr RdKafkaT
k err :: RdKafkaRespErrT
err pl :: Ptr RdKafkaTopicPartitionListT
pl = do
      ForeignPtr RdKafkaT
k' <- Ptr RdKafkaT -> IO (ForeignPtr RdKafkaT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaT
k
      ForeignPtr RdKafkaTopicPartitionListT
pls <- Ptr RdKafkaTopicPartitionListT
-> IO (ForeignPtr RdKafkaTopicPartitionListT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTopicPartitionListT
pl
      (KafkaConsumer -> RebalanceEvent -> IO ())
-> KafkaConsumer
-> KafkaError
-> ForeignPtr RdKafkaTopicPartitionListT
-> IO ()
setRebalanceCallback KafkaConsumer -> RebalanceEvent -> IO ()
callback (Kafka -> KafkaConf -> KafkaConsumer
KafkaConsumer (ForeignPtr RdKafkaT -> Kafka
Kafka ForeignPtr RdKafkaT
k') KafkaConf
kc) (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err) ForeignPtr RdKafkaTopicPartitionListT
pls

-- | Sets a callback that is called when rebalance is needed.
--
-- The results of automatic or manual offset commits will be scheduled
-- for this callback and is served by 'Kafka.Consumer.pollMessage'.
--
-- If no partitions had valid offsets to commit this callback will be called
-- with 'KafkaResponseError' 'RdKafkaRespErrNoOffset' which is not to be considered
-- an error.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ())
-> Callback
offsetCommitCallback callback :: KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()
callback =
  (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc@(KafkaConf conf :: RdKafkaConfTPtr
conf _ _) -> RdKafkaConfTPtr -> RdRebalanceCallback -> IO ()
rdKafkaConfSetOffsetCommitCb RdKafkaConfTPtr
conf (KafkaConf -> RdRebalanceCallback
realCb KafkaConf
kc)
  where
    realCb :: KafkaConf -> RdRebalanceCallback
realCb kc :: KafkaConf
kc k :: Ptr RdKafkaT
k err :: RdKafkaRespErrT
err pl :: Ptr RdKafkaTopicPartitionListT
pl = do
      ForeignPtr RdKafkaT
k' <- Ptr RdKafkaT -> IO (ForeignPtr RdKafkaT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaT
k
      [TopicPartition]
pls <- Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' Ptr RdKafkaTopicPartitionListT
pl
      KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()
callback (Kafka -> KafkaConf -> KafkaConsumer
KafkaConsumer (ForeignPtr RdKafkaT -> Kafka
Kafka ForeignPtr RdKafkaT
k') KafkaConf
kc) (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err) [TopicPartition]
pls

-------------------------------------------------------------------------------
redirectPartitionQueue :: Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue :: Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue (Kafka k :: ForeignPtr RdKafkaT
k) (TopicName t :: Text
t) (PartitionId p :: Int
p) q :: RdKafkaQueueTPtr
q = do
  Maybe RdKafkaQueueTPtr
mpq <- ForeignPtr RdKafkaT -> String -> Int -> IO (Maybe RdKafkaQueueTPtr)
rdKafkaQueueGetPartition ForeignPtr RdKafkaT
k (Text -> String
Text.unpack Text
t) Int
p
  case Maybe RdKafkaQueueTPtr
mpq of
    Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Just pq :: RdKafkaQueueTPtr
pq -> RdKafkaQueueTPtr -> RdKafkaQueueTPtr -> IO ()
rdKafkaQueueForward RdKafkaQueueTPtr
pq RdKafkaQueueTPtr
q

setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
                          -> KafkaConsumer
                          -> KafkaError
                          -> RdKafkaTopicPartitionListTPtr -> IO ()
setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
-> KafkaConsumer
-> KafkaError
-> ForeignPtr RdKafkaTopicPartitionListT
-> IO ()
setRebalanceCallback f :: KafkaConsumer -> RebalanceEvent -> IO ()
f k :: KafkaConsumer
k e :: KafkaError
e pls :: ForeignPtr RdKafkaTopicPartitionListT
pls = do
  [TopicPartition]
ps <- ForeignPtr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList'' ForeignPtr RdKafkaTopicPartitionListT
pls
  let assignment :: [(TopicName, PartitionId)]
assignment = (TopicPartition -> TopicName
tpTopicName (TopicPartition -> TopicName)
-> (TopicPartition -> PartitionId)
-> TopicPartition
-> (TopicName, PartitionId)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& TopicPartition -> PartitionId
tpPartition) (TopicPartition -> (TopicName, PartitionId))
-> [TopicPartition] -> [(TopicName, PartitionId)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition]
ps
  let (Kafka kptr :: ForeignPtr RdKafkaT
kptr) = KafkaConsumer -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka KafkaConsumer
k

  case KafkaError
e of
    KafkaResponseError RdKafkaRespErrAssignPartitions -> do
        KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceBeforeAssign [(TopicName, PartitionId)]
assignment)
        IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaAssign ForeignPtr RdKafkaT
kptr ForeignPtr RdKafkaTopicPartitionListT
pls

        Maybe RdKafkaQueueTPtr
mbq <- KafkaConf -> IO (Maybe RdKafkaQueueTPtr)
forall k. HasKafkaConf k => k -> IO (Maybe RdKafkaQueueTPtr)
getRdMsgQueue (KafkaConf -> IO (Maybe RdKafkaQueueTPtr))
-> KafkaConf -> IO (Maybe RdKafkaQueueTPtr)
forall a b. (a -> b) -> a -> b
$ KafkaConsumer -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf KafkaConsumer
k
        case Maybe RdKafkaQueueTPtr
mbq of
          Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Just mq :: RdKafkaQueueTPtr
mq -> do
            {- Magnus Edenhill:
                If you redirect after assign() it means some messages may be forwarded to the single consumer queue,
                so either do it before assign() or do: assign(); pause(); redirect; resume()
            -}
            IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaPausePartitions ForeignPtr RdKafkaT
kptr ForeignPtr RdKafkaTopicPartitionListT
pls
            [TopicPartition] -> (TopicPartition -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [TopicPartition]
ps (\tp :: TopicPartition
tp -> Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue (KafkaConsumer -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka KafkaConsumer
k) (TopicPartition -> TopicName
tpTopicName TopicPartition
tp) (TopicPartition -> PartitionId
tpPartition TopicPartition
tp) RdKafkaQueueTPtr
mq)
            IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaResumePartitions ForeignPtr RdKafkaT
kptr ForeignPtr RdKafkaTopicPartitionListT
pls

        KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceAssign [(TopicName, PartitionId)]
assignment)

    KafkaResponseError RdKafkaRespErrRevokePartitions -> do
        KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceBeforeRevoke [(TopicName, PartitionId)]
assignment)
        IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr RdKafkaTopicPartitionListT
-> IO (ForeignPtr RdKafkaTopicPartitionListT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTopicPartitionListT
forall a. Ptr a
nullPtr IO (ForeignPtr RdKafkaTopicPartitionListT)
-> (ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT)
-> IO RdKafkaRespErrT
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaAssign ForeignPtr RdKafkaT
kptr
        KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceRevoke [(TopicName, PartitionId)]
assignment)
    x :: KafkaError
x -> String -> IO ()
forall a. HasCallStack => String -> a
error (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "Rebalance: UNKNOWN response: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> KafkaError -> String
forall a. Show a => a -> String
show KafkaError
x