module Kafka.Conduit.Sink
( module X
, kafkaSink, kafkaSinkAutoClose, kafkaSinkNoClose, kafkaBatchSinkNoClose
, commitOffsetsSink, flushThenCommitSink
) where
import Control.Monad (void)
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource
import Data.Conduit
import qualified Data.Conduit.List as L
import Kafka.Consumer
import Kafka.Conduit.Combinators as X
import Kafka.Consumer as X (KafkaConsumer)
import Kafka.Producer as X
kafkaSinkAutoClose :: MonadResource m
=> KafkaProducer
-> Sink ProducerRecord m (Maybe KafkaError)
kafkaSinkAutoClose prod =
bracketP (return prod) (void . closeProducer) runHandler
where
runHandler p' = do
mbMsg <- await
case mbMsg of
Nothing -> return Nothing
Just msg -> do
res <- produceMessage p' msg
case res of
Nothing -> runHandler p'
Just err -> return (Just err)
kafkaSinkNoClose :: MonadIO m
=> KafkaProducer
-> Sink ProducerRecord m (Maybe KafkaError)
kafkaSinkNoClose prod = go
where
go = do
mbMsg <- await
case mbMsg of
Nothing -> return Nothing
Just msg -> do
res <- produceMessage prod msg
case res of
Nothing -> go
Just err -> return (Just err)
kafkaBatchSinkNoClose :: MonadIO m
=> KafkaProducer
-> Sink [ProducerRecord] m [(ProducerRecord, KafkaError)]
kafkaBatchSinkNoClose prod = go
where
go = do
mbMsg <- await
case mbMsg of
Nothing -> return []
Just msgs -> do
res <- produceMessageBatch prod msgs
case res of
[] -> go
xs -> return xs
kafkaSink :: MonadResource m
=> ProducerProperties
-> Sink ProducerRecord m (Maybe KafkaError)
kafkaSink props =
bracketP mkProducer clProducer runHandler
where
mkProducer = newProducer props
clProducer (Left _) = return ()
clProducer (Right prod) = void $ closeProducer prod
runHandler (Left err) = return (Just err)
runHandler (Right prod) = do
mbMsg <- await
case mbMsg of
Nothing -> return Nothing
Just msg -> do
res <- produceMessage prod msg
runHandler $ maybe (Right prod) Left res
{-# DEPRECATED commitOffsetsSink "Conceptually wrong thing to do. Does not require library support. Consider calling 'commitAllOffsets' when appropriate." #-}
commitOffsetsSink :: MonadIO m => KafkaConsumer -> Sink i m ()
commitOffsetsSink = flip commitOffsetsSink' (const $ pure ())
{-# DEPRECATED commitOffsetsSink' "Conceptually wrong thing to do. Does not require library support. Consider calling 'commitAllOffsets' when appropriate." #-}
commitOffsetsSink':: MonadIO m => KafkaConsumer -> (KafkaError -> m ()) -> Sink i m ()
commitOffsetsSink' consumer handleError = L.mapM_ $ \_ -> do
res <- commitAllOffsets OffsetCommit consumer
case res of
Nothing -> pure ()
Just err -> handleError err
{-# DEPRECATED flushThenCommitSink "Conceptually wrong thing to do. Does not require library support. Consider calling 'flushProducer >>= commitAllOffsets' when appropriate." #-}
flushThenCommitSink :: MonadIO m => KafkaConsumer -> KafkaProducer -> Sink i m ()
flushThenCommitSink consumer producer = flushThenCommitSink' consumer producer (const $ pure ())
{-# DEPRECATED flushThenCommitSink' "Conceptually wrong thing to do. Does not require library support. Consider calling 'flushProducer >>= commitAllOffsets' when appropriate." #-}
flushThenCommitSink' :: MonadIO m => KafkaConsumer -> KafkaProducer -> (KafkaError -> m ()) -> Sink i m ()
flushThenCommitSink' consumer producer handleError = L.mapM_ $ \_ -> do
flushProducer producer
res <- commitAllOffsets OffsetCommit consumer
case res of
Nothing -> pure ()
Just err -> handleError err