{-# LANGUAGE TupleSections #-}
module Kafka.Consumer
( module X
, runConsumer
, newConsumer
, assignment, subscription
, pausePartitions, resumePartitions
, committed, position, seek
, pollMessage, pollConsumerEvents
, pollMessageBatch
, commitOffsetMessage, commitAllOffsets, commitPartitionsOffsets
, storeOffsets, storeOffsetMessage
, closeConsumer
, KafkaConsumer
, RdKafkaRespErrT (..)
)
where
import Control.Arrow
import Control.Concurrent (forkIO, rtsSupportsBoundThreads)
import Control.Exception
import Control.Monad (forM_, void, when)
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Data.Bifunctor
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import Foreign hiding (void)
import Kafka.Consumer.Convert
import Kafka.Consumer.Types
import Kafka.Internal.CancellationToken as CToken
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Internal.Shared
import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.Subscription as X
import Kafka.Consumer.Types as X hiding (KafkaConsumer)
import Kafka.Types as X
{-# DEPRECATED runConsumer "Use newConsumer/closeConsumer instead" #-}
runConsumer :: ConsumerProperties
-> Subscription
-> (KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runConsumer cp sub f =
bracket mkConsumer clConsumer runHandler
where
mkConsumer = newConsumer cp sub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = maybeToLeft <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = f kc
newConsumer :: MonadIO m
=> ConsumerProperties
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer props (Subscription ts tp) = liftIO $ do
let cp = setCallback (rebalanceCallback (\_ _ -> return ())) <> props
kc@(KafkaConf kc' qref ct) <- newConsumerConf cp
tp' <- topicConf (TopicProps $ M.toList tp)
_ <- setDefaultTopicConf kc tp'
rdk <- newRdKafkaT RdKafkaConsumer kc'
case rdk of
Left err -> return . Left $ KafkaError err
Right rdk' -> do
msgq <- rdKafkaQueueNew rdk'
writeIORef qref (Just msgq)
let kafka = KafkaConsumer (Kafka rdk') kc
redErr <- redirectCallbacksPoll kafka
case redErr of
Just err -> closeConsumer kafka >> return (Left err)
Nothing -> do
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> runConsumerLoop kafka ct (Just $ Timeout 100) >> return (Right kafka)
Just err -> closeConsumer kafka >> return (Left err)
pollMessage :: MonadIO m
=> KafkaConsumer
-> Timeout
-> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
pollMessage c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) = liftIO $ do
pollConsumerEvents c Nothing
mbq <- readIORef qr
case mbq of
Nothing -> return . Left $ KafkaBadSpecification "Messages queue is not configured, internal error, fatal."
Just q -> rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr
pollMessageBatch :: MonadIO m
=> KafkaConsumer
-> Timeout
-> BatchSize
-> m [Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))]
pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize b) = liftIO $ do
pollConsumerEvents c Nothing
mbq <- readIORef qr
case mbq of
Nothing -> return [Left $ KafkaBadSpecification "Messages queue is not configured, internal error, fatal."]
Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr
commitOffsetMessage :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> ConsumerRecord k v
-> m (Maybe KafkaError)
commitOffsetMessage o k m =
liftIO $ toNativeTopicPartitionList [topicPartitionFromMessageForCommit m] >>= commitOffsets o k
storeOffsetMessage :: MonadIO m
=> KafkaConsumer
-> ConsumerRecord k v
-> m (Maybe KafkaError)
storeOffsetMessage k m =
liftIO $ toNativeTopicPartitionListNoDispose [topicPartitionFromMessageForCommit m] >>= commitOffsetsStore k
storeOffsets :: MonadIO m
=> KafkaConsumer
-> [TopicPartition]
-> m (Maybe KafkaError)
storeOffsets k ps =
liftIO $ toNativeTopicPartitionListNoDispose ps >>= commitOffsetsStore k
commitAllOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> m (Maybe KafkaError)
commitAllOffsets o k =
liftIO $ newForeignPtr_ nullPtr >>= commitOffsets o k
commitPartitionsOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> [TopicPartition]
-> m (Maybe KafkaError)
commitPartitionsOffsets o k ps =
liftIO $ toNativeTopicPartitionList ps >>= commitOffsets o k
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (M.Map TopicName [PartitionId]))
assignment (KafkaConsumer (Kafka k) _) = liftIO $ do
tpl <- rdKafkaAssignment k
tps <- traverse fromNativeTopicPartitionList'' (left KafkaResponseError tpl)
return $ tpMap <$> tps
where
tpMap ts = toMap $ (tpTopicName &&& tpPartition) <$> ts
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
subscription (KafkaConsumer (Kafka k) _) = liftIO $ do
tpl <- rdKafkaSubscription k
tps <- traverse fromNativeTopicPartitionList'' (left KafkaResponseError tpl)
return $ toSub <$> tps
where
toSub ts = M.toList $ subParts <$> tpMap ts
tpMap ts = toMap $ (tpTopicName &&& tpPartition) <$> ts
subParts [PartitionId (-1)] = SubscribedPartitionsAll
subParts ps = SubscribedPartitions ps
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
pausePartitions (KafkaConsumer (Kafka k) _) ps = liftIO $ do
pl <- newRdKafkaTopicPartitionListT (length ps)
mapM_ (\(TopicName topicName, PartitionId partitionId) -> rdKafkaTopicPartitionListAdd pl topicName partitionId) ps
KafkaResponseError <$> rdKafkaPausePartitions k pl
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
resumePartitions (KafkaConsumer (Kafka k) _) ps = liftIO $ do
pl <- newRdKafkaTopicPartitionListT (length ps)
mapM_ (\(TopicName topicName, PartitionId partitionId) -> rdKafkaTopicPartitionListAdd pl topicName partitionId) ps
KafkaResponseError <$> rdKafkaResumePartitions k pl
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
seek (KafkaConsumer (Kafka k) _) (Timeout timeout) tps = liftIO $
either Just (const Nothing) <$> seekAll
where
seekAll = runExceptT $ do
tr <- traverse (ExceptT . topicPair) tps
void $ traverse (\(kt, p, o) -> ExceptT (rdSeek kt p o)) tr
return ()
rdSeek kt (PartitionId p) o =
rdKafkaErrorToEither <$> rdKafkaSeek kt (fromIntegral p) (offsetToInt64 o) timeout
topicPair tp = do
let (TopicName tn) = tpTopicName tp
nt <- newRdKafkaTopicT k tn Nothing
return $ bimap KafkaError (,tpPartition tp, tpOffset tp) nt
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
committed (KafkaConsumer (Kafka k) _) (Timeout timeout) tps = liftIO $ do
ntps <- toNativeTopicPartitionList' tps
res <- rdKafkaCommitted k ntps timeout
case res of
RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps
err -> return $ Left (KafkaResponseError err)
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
position (KafkaConsumer (Kafka k) _) tps = liftIO $ do
ntps <- toNativeTopicPartitionList' tps
res <- rdKafkaPosition k ntps
case res of
RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps
err -> return $ Left (KafkaResponseError err)
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents k timeout =
let (Timeout tm) = fromMaybe (Timeout 0) timeout
in void $ rdKafkaConsumerPoll (getRdKafka k) tm
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr ct)) = liftIO $ do
CToken.cancel ct
mbq <- readIORef qr
void $ traverse rdKafkaQueueDestroy mbq
kafkaErrorToMaybe . KafkaResponseError <$> rdKafkaConsumerClose k
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf ConsumerProperties {cpProps = m, cpCallbacks = cbs} = do
conf <- kafkaConf (KafkaProps $ M.toList m)
forM_ cbs (\setCb -> setCb conf)
return conf
subscribe :: KafkaConsumer -> [TopicName] -> IO (Maybe KafkaError)
subscribe (KafkaConsumer (Kafka k) _) ts = do
pl <- newRdKafkaTopicPartitionListT (length ts)
mapM_ (\(TopicName t) -> rdKafkaTopicPartitionListAdd pl t (-1)) ts
res <- KafkaResponseError <$> rdKafkaSubscribe k pl
return $ kafkaErrorToMaybe res
setDefaultTopicConf :: KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf (KafkaConf kc _ _) (TopicConf tc) =
rdKafkaTopicConfDup tc >>= rdKafkaConfSetDefaultTopicConf kc
commitOffsets :: OffsetCommit -> KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsets o (KafkaConsumer (Kafka k) _) pl =
kafkaErrorToMaybe . KafkaResponseError <$> rdKafkaCommit k pl (offsetCommitToBool o)
commitOffsetsStore :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore (KafkaConsumer (Kafka k) _) pl =
kafkaErrorToMaybe . KafkaResponseError <$> rdKafkaOffsetsStore k pl
setConsumerLogLevel :: KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel (KafkaConsumer (Kafka k) _) level =
liftIO $ rdKafkaSetLogLevel k (fromEnum level)
redirectCallbacksPoll :: KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll (KafkaConsumer (Kafka k) _) =
kafkaErrorToMaybe . KafkaResponseError <$> rdKafkaPollSetConsumer k
runConsumerLoop :: KafkaConsumer -> CancellationToken -> Maybe Timeout -> IO ()
runConsumerLoop k ct timeout =
when rtsSupportsBoundThreads $ void $ forkIO go
where
go = do
token <- CToken.status ct
case token of
Running -> pollConsumerEvents k timeout >> go
Cancelled -> return ()