{-# LANGUAGE TupleSections #-}
module Kafka.Consumer
( module X
, runConsumer
, newConsumer
, assign
, pollMessage
, commitOffsetMessage
, commitAllOffsets
, closeConsumer
, TopicName (..)
, CIT.OffsetCommit (..)
, CIT.PartitionOffset (..)
, CIT.TopicPartition (..)
, CIT.ConsumerRecord (..)
, RDE.RdKafkaRespErrT (..)
)
where
import Control.Exception
import Control.Monad (forM_)
import Control.Monad.IO.Class
import qualified Data.ByteString as BS
import qualified Data.Map as M
import Foreign hiding (void)
import Kafka.Consumer.Convert
import Kafka.Internal.RdKafka
import Kafka.Internal.RdKafkaEnum
import Kafka.Internal.Setup
import Kafka.Internal.Shared
import qualified Kafka.Consumer.Types as CIT
import qualified Kafka.Internal.RdKafkaEnum as RDE
import Kafka.Types as X
import Kafka.Consumer.Types as X
import Kafka.Consumer.Subscription as X
import Kafka.Consumer.ConsumerProperties as X
runConsumer :: ConsumerProperties
-> Subscription
-> (KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runConsumer cp sub f =
bracket mkConsumer clConsumer runHandler
where
mkConsumer = newConsumer cp sub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = maybeToLeft <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = f kc
newConsumer :: MonadIO m
=> ConsumerProperties
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer cp (Subscription ts tp) = liftIO $ do
(KafkaConf kc) <- newConsumerConf cp
tp' <- topicConf (TopicProps $ M.toList tp)
_ <- setDefaultTopicConf kc tp'
rdk <- mapLeft KafkaError <$> newRdKafkaT RdKafkaConsumer kc
case flip KafkaConsumer kc <$> rdk of
Left err -> return $ Left err
Right kafka -> do
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> return $ Right kafka
Just err -> closeConsumer kafka >> return (Left err)
pollMessage :: MonadIO m
=> KafkaConsumer
-> Timeout
-> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
pollMessage (KafkaConsumer k _) (Timeout ms) =
liftIO $ rdKafkaConsumerPoll k (fromIntegral ms) >>= fromMessagePtr
commitOffsetMessage :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> ConsumerRecord k v
-> m (Maybe KafkaError)
commitOffsetMessage o k m =
liftIO $ toNativeTopicPartitionList [topicPartitionFromMessage m] >>= commitOffsets o k
commitAllOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> m (Maybe KafkaError)
commitAllOffsets o k =
liftIO $ newForeignPtr_ nullPtr >>= commitOffsets o k
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m KafkaError
assign (KafkaConsumer k _) ps =
let pl = if null ps
then newForeignPtr_ nullPtr
else toNativeTopicPartitionList ps
in liftIO $ KafkaResponseError <$> (pl >>= rdKafkaAssign k)
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
closeConsumer (KafkaConsumer k _) =
liftIO $ (kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaConsumerClose k
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf (ConsumerProperties m rcb ccb _) = do
conf <- kafkaConf (KafkaProps $ M.toList m)
forM_ rcb (\(ReballanceCallback cb) -> setRebalanceCallback conf cb)
forM_ ccb (\(OffsetsCommitCallback cb) -> setOffsetCommitCallback conf cb)
return conf
setRebalanceCallback :: KafkaConf
-> (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ())
-> IO ()
setRebalanceCallback (KafkaConf conf) callback = rdKafkaConfSetRebalanceCb conf realCb
where
realCb :: Ptr RdKafkaT -> RdKafkaRespErrT -> Ptr RdKafkaTopicPartitionListT -> Ptr Word8 -> IO ()
realCb rk err pl _ = do
rk' <- newForeignPtr_ rk
ps <- fromNativeTopicPartitionList' pl
callback (KafkaConsumer rk' conf) (KafkaResponseError err) ps
setOffsetCommitCallback :: KafkaConf
-> (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ())
-> IO ()
setOffsetCommitCallback (KafkaConf conf) callback = rdKafkaConfSetOffsetCommitCb conf realCb
where
realCb :: Ptr RdKafkaT -> RdKafkaRespErrT -> Ptr RdKafkaTopicPartitionListT -> Ptr Word8 -> IO ()
realCb rk err pl _ = do
rk' <- newForeignPtr_ rk
ps <- fromNativeTopicPartitionList' pl
callback (KafkaConsumer rk' conf) (KafkaResponseError err) ps
subscribe :: KafkaConsumer -> [TopicName] -> IO (Maybe KafkaError)
subscribe (KafkaConsumer k _) ts = do
pl <- newRdKafkaTopicPartitionListT (length ts)
mapM_ (\(TopicName t) -> rdKafkaTopicPartitionListAdd pl t (-1)) ts
res <- KafkaResponseError <$> rdKafkaSubscribe k pl
return $ kafkaErrorToMaybe res
setDefaultTopicConf :: RdKafkaConfTPtr -> TopicConf -> IO ()
setDefaultTopicConf kc (TopicConf tc) =
rdKafkaTopicConfDup tc >>= rdKafkaConfSetDefaultTopicConf kc
commitOffsets :: OffsetCommit -> KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsets o (KafkaConsumer k _) pl =
(kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaCommit k pl (offsetCommitToBool o)
setConsumerLogLevel :: KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel (KafkaConsumer k _) level =
liftIO $ rdKafkaSetLogLevel k (fromEnum level)