{-# LANGUAGE TupleSections #-}
module Kafka.Consumer
( module X
, runConsumer
, newConsumer
, assign, assignment, subscription
, pausePartitions, resumePartitions
, committed, position, seek
, pollMessage
, commitOffsetMessage, commitAllOffsets, commitPartitionsOffsets
, closeConsumer
, KafkaConsumer
, RdKafkaRespErrT (..)
)
where
import Control.Arrow
import Control.Exception
import Control.Monad (forM_)
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Data.Bifunctor
import qualified Data.ByteString as BS
import qualified Data.Map as M
import Foreign hiding (void)
import Kafka.Consumer.Convert
import Kafka.Consumer.Types
import Kafka.Internal.CancellationToken as CToken
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Internal.Shared
import qualified Kafka.Consumer.Types as CIT
import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.Subscription as X
import Kafka.Consumer.Types as X hiding (KafkaConsumer)
import Kafka.Types as X
{-# DEPRECATED runConsumer "Use 'newConsumer'/'closeConsumer' instead" #-}
runConsumer :: ConsumerProperties
-> Subscription
-> (KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runConsumer cp sub f =
bracket mkConsumer clConsumer runHandler
where
mkConsumer = newConsumer cp sub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = maybeToLeft <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = f kc
newConsumer :: MonadIO m
=> ConsumerProperties
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer cp (Subscription ts tp) = liftIO $ do
kc@(KafkaConf kc' ct) <- newConsumerConf cp
tp' <- topicConf (TopicProps $ M.toList tp)
_ <- setDefaultTopicConf kc tp'
rdk <- bimap KafkaError Kafka <$> newRdKafkaT RdKafkaConsumer kc'
case flip KafkaConsumer kc <$> rdk of
Left err -> return $ Left err
Right kafka -> do
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> runEventLoop kafka ct (Just $ Timeout 100) >> return (Right kafka)
Just err -> closeConsumer kafka >> return (Left err)
pollMessage :: MonadIO m
=> KafkaConsumer
-> Timeout
-> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
pollMessage c@(KafkaConsumer (Kafka k) _) (Timeout ms) =
liftIO $ pollEvents c Nothing >> rdKafkaConsumerPoll k (fromIntegral ms) >>= fromMessagePtr
commitOffsetMessage :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> ConsumerRecord k v
-> m (Maybe KafkaError)
commitOffsetMessage o k m =
liftIO $ toNativeTopicPartitionList [topicPartitionFromMessage m] >>= commitOffsets o k
commitAllOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> m (Maybe KafkaError)
commitAllOffsets o k =
liftIO $ newForeignPtr_ nullPtr >>= commitOffsets o k
commitPartitionsOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> [TopicPartition]
-> m (Maybe KafkaError)
commitPartitionsOffsets o k ps =
liftIO $ toNativeTopicPartitionList ps >>= commitOffsets o k
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m KafkaError
assign (KafkaConsumer (Kafka k) _) ps =
let pl = if null ps
then newForeignPtr_ nullPtr
else toNativeTopicPartitionList ps
in liftIO $ KafkaResponseError <$> (pl >>= rdKafkaAssign k)
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (M.Map TopicName [PartitionId]))
assignment (KafkaConsumer (Kafka k) _) = liftIO $ do
tpl <- rdKafkaAssignment k
tps <- traverse fromNativeTopicPartitionList'' (left KafkaResponseError tpl)
return $ tpMap <$> tps
where
tpMap ts = toMap $ (tpTopicName &&& tpPartition) <$> ts
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
subscription (KafkaConsumer (Kafka k) _) = liftIO $ do
tpl <- rdKafkaSubscription k
tps <- traverse fromNativeTopicPartitionList'' (left KafkaResponseError tpl)
return $ toSub <$> tps
where
toSub ts = M.toList $ subParts <$> tpMap ts
tpMap ts = toMap $ (tpTopicName &&& tpPartition) <$> ts
subParts [PartitionId (-1)] = SubscribedPartitionsAll
subParts ps = SubscribedPartitions ps
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
pausePartitions (KafkaConsumer (Kafka k) _) ps = liftIO $ do
pl <- newRdKafkaTopicPartitionListT (length ps)
mapM_ (\(TopicName topicName, PartitionId partitionId) -> rdKafkaTopicPartitionListAdd pl topicName partitionId) ps
KafkaResponseError <$> rdKafkaPausePartitions k pl
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
resumePartitions (KafkaConsumer (Kafka k) _) ps = liftIO $ do
pl <- newRdKafkaTopicPartitionListT (length ps)
mapM_ (\(TopicName topicName, PartitionId partitionId) -> rdKafkaTopicPartitionListAdd pl topicName partitionId) ps
KafkaResponseError <$> rdKafkaResumePartitions k pl
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
seek (KafkaConsumer (Kafka k) _) (Timeout timeout) tps = liftIO $
either Just (const Nothing) <$> seekAll
where
seekAll = runExceptT $ do
tr <- traverse (ExceptT . topicPair) tps
_ <- traverse (\(kt, p, o) -> ExceptT (rdSeek kt p o)) tr
return ()
rdSeek kt (PartitionId p) o = do
res <- rdKafkaSeek kt (fromIntegral p) (offsetToInt64 o) timeout
return $ rdKafkaErrorToEither res
topicPair tp = do
let (TopicName tn) = tpTopicName tp
nt <- newRdKafkaTopicT k tn Nothing
return $ bimap KafkaError (,tpPartition tp, tpOffset tp) nt
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
committed (KafkaConsumer (Kafka k) _) (Timeout timeout) tps = liftIO $ do
ntps <- toNativeTopicPartitionList' tps
res <- rdKafkaCommitted k ntps timeout
case res of
RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps
err -> return $ Left (KafkaResponseError err)
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
position (KafkaConsumer (Kafka k) _) tps = liftIO $ do
ntps <- toNativeTopicPartitionList' tps
res <- rdKafkaPosition k ntps
case res of
RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps
err -> return $ Left (KafkaResponseError err)
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ ct)) =
liftIO $ CToken.cancel ct >> (kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaConsumerClose k
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf (ConsumerProperties m _ cbs) = do
conf <- kafkaConf (KafkaProps $ M.toList m)
forM_ cbs (\setCb -> setCb conf)
return conf
subscribe :: KafkaConsumer -> [TopicName] -> IO (Maybe KafkaError)
subscribe (KafkaConsumer (Kafka k) _) ts = do
pl <- newRdKafkaTopicPartitionListT (length ts)
mapM_ (\(TopicName t) -> rdKafkaTopicPartitionListAdd pl t (-1)) ts
res <- KafkaResponseError <$> rdKafkaSubscribe k pl
return $ kafkaErrorToMaybe res
setDefaultTopicConf :: KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf (KafkaConf kc _) (TopicConf tc) =
rdKafkaTopicConfDup tc >>= rdKafkaConfSetDefaultTopicConf kc
commitOffsets :: OffsetCommit -> KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsets o (KafkaConsumer (Kafka k) _) pl =
(kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaCommit k pl (offsetCommitToBool o)
setConsumerLogLevel :: KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel (KafkaConsumer (Kafka k) _) level =
liftIO $ rdKafkaSetLogLevel k (fromEnum level)