{-# LANGUAGE ApplicativeDo #-}

module Freckle.App.Kafka.Consumer
  ( HasKafkaConsumer (..)
  , withKafkaConsumer
  , KafkaConsumerConfig (..)
  , envKafkaConsumerConfig
  , runConsumer
  ) where

import Freckle.App.Prelude

import Blammo.Logging
import Control.Lens (Lens', view)
import Control.Monad (forever)
import Data.Aeson
import Data.ByteString (ByteString)
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Env
import Freckle.App.Async
import Freckle.App.Env
import Freckle.App.Exception (annotatedExceptionMessageFrom)
import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses)
import Freckle.App.OpenTelemetry
import Kafka.Consumer hiding
  ( Timeout
  , closeConsumer
  , newConsumer
  , runConsumer
  , subscription
  )
import qualified Kafka.Consumer as Kafka
import UnliftIO.Exception (bracket)

data KafkaConsumerConfig = KafkaConsumerConfig
  { KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
  -- ^ The list of host/port pairs for establishing the initial connection
  -- to the Kafka cluster.
  --
  -- This is the `bootstrap.servers` Kafka consumer configuration property.
  , KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigGroupId :: ConsumerGroupId
  -- ^ The consumer group id to which the consumer belongs.
  --
  -- This is the `group.id` Kafka consumer configuration property.
  , KafkaConsumerConfig -> TopicName
kafkaConsumerConfigTopic :: TopicName
  -- ^ The topic name polled for messages by the Kafka consumer.
  , KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigOffsetReset :: OffsetReset
  -- ^ The offset reset parameter used when there is no initial offset in Kafka.
  --
  -- This is the `auto.offset.reset` Kafka consumer configuration property.
  , KafkaConsumerConfig -> Millis
kafkaConsumerConfigAutoCommitInterval :: Millis
  -- ^ The interval that offsets are auto-committed to Kafka.
  --
  -- This sets the `auto.commit.interval.ms` and `enable.auto.commit` Kafka
  -- consumer configuration properties.
  , KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
  -- ^ Extra properties used to configure the Kafka consumer.
  }
  deriving stock (Int -> KafkaConsumerConfig -> ShowS
[KafkaConsumerConfig] -> ShowS
KafkaConsumerConfig -> String
(Int -> KafkaConsumerConfig -> ShowS)
-> (KafkaConsumerConfig -> String)
-> ([KafkaConsumerConfig] -> ShowS)
-> Show KafkaConsumerConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaConsumerConfig -> ShowS
showsPrec :: Int -> KafkaConsumerConfig -> ShowS
$cshow :: KafkaConsumerConfig -> String
show :: KafkaConsumerConfig -> String
$cshowList :: [KafkaConsumerConfig] -> ShowS
showList :: [KafkaConsumerConfig] -> ShowS
Show)

envKafkaTopic
  :: Env.Parser Env.Error TopicName
envKafkaTopic :: Parser Error TopicName
envKafkaTopic =
  Reader Error TopicName
-> String -> Mod Var TopicName -> Parser Error TopicName
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    ((String -> Either String TopicName) -> Reader Error TopicName
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String TopicName
readKafkaTopic)
    String
"KAFKA_TOPIC"
    Mod Var TopicName
forall a. Monoid a => a
mempty

readKafkaTopic :: String -> Either String TopicName
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic String
t = case String -> Text
T.pack String
t of
  Text
"" -> String -> Either String TopicName
forall a b. a -> Either a b
Left String
"Kafka topics cannot be empty"
  Text
x -> TopicName -> Either String TopicName
forall a b. b -> Either a b
Right (TopicName -> Either String TopicName)
-> TopicName -> Either String TopicName
forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName Text
x

envKafkaOffsetReset
  :: Env.Parser Env.Error OffsetReset
envKafkaOffsetReset :: Parser Error OffsetReset
envKafkaOffsetReset =
  Reader Error OffsetReset
-> String -> Mod Var OffsetReset -> Parser Error OffsetReset
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    ((String -> Either String OffsetReset) -> Reader Error OffsetReset
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String OffsetReset
readKafkaOffsetReset)
    String
"KAFKA_OFFSET_RESET"
    (Mod Var OffsetReset -> Parser Error OffsetReset)
-> Mod Var OffsetReset -> Parser Error OffsetReset
forall a b. (a -> b) -> a -> b
$ OffsetReset -> Mod Var OffsetReset
forall a. a -> Mod Var a
Env.def OffsetReset
Earliest

readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset String
t = case String -> Text
T.pack String
t of
  Text
"earliest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Earliest
  Text
"latest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Latest
  Text
_ -> String -> Either String OffsetReset
forall a b. a -> Either a b
Left String
"Kafka offset reset must be one of earliest or latest"

envKafkaConsumerConfig
  :: Env.Parser Env.Error KafkaConsumerConfig
envKafkaConsumerConfig :: Parser Error KafkaConsumerConfig
envKafkaConsumerConfig = do
  NonEmpty BrokerAddress
brokerAddresses <- Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses
  ConsumerGroupId
consumerGroupId <- Reader Error ConsumerGroupId
-> String
-> Mod Var ConsumerGroupId
-> Parser Error ConsumerGroupId
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error ConsumerGroupId
forall e s. (AsEmpty e, IsString s) => Reader e s
Env.nonempty String
"KAFKA_CONSUMER_GROUP_ID" Mod Var ConsumerGroupId
forall a. Monoid a => a
mempty
  TopicName
kafkaTopic <- Parser Error TopicName
envKafkaTopic
  OffsetReset
kafkaOffsetReset <- Parser Error OffsetReset
envKafkaOffsetReset
  Millis
kafkaAutoOffsetInterval <-
    Int -> Millis
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Millis) -> (Timeout -> Int) -> Timeout -> Millis
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Int
timeoutMs (Timeout -> Millis)
-> (Mod Var Timeout -> Parser Error Timeout)
-> Mod Var Timeout
-> Parser Error Millis
forall (c :: * -> *) (d :: * -> *) a b.
(Functor c, Functor d) =>
(a -> b) -> c (d a) -> c (d b)
<$$> Reader Error Timeout
-> String -> Mod Var Timeout -> Parser Error Timeout
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Timeout
timeout String
"KAFKA_AUTO_COMMIT_INTERVAL" (Mod Var Timeout -> Parser Error Millis)
-> Mod Var Timeout -> Parser Error Millis
forall a b. (a -> b) -> a -> b
$
      Timeout -> Mod Var Timeout
forall a. a -> Mod Var a
Env.def (Timeout -> Mod Var Timeout) -> Timeout -> Mod Var Timeout
forall a b. (a -> b) -> a -> b
$
        Int -> Timeout
TimeoutMilliseconds Int
5000
  Map Text Text
kafkaExtraProps <-
    Reader Error (Map Text Text)
-> String
-> Mod Var (Map Text Text)
-> Parser Error (Map Text Text)
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
      (([(Text, Text)] -> Map Text Text)
-> Either Error [(Text, Text)] -> Either Error (Map Text Text)
forall a b. (a -> b) -> Either Error a -> Either Error b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList (Either Error [(Text, Text)] -> Either Error (Map Text Text))
-> (String -> Either Error [(Text, Text)])
-> Reader Error (Map Text Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Either Error [(Text, Text)]
keyValues)
      String
"KAFKA_EXTRA_SUBSCRIPTION_PROPS"
      (Map Text Text -> Mod Var (Map Text Text)
forall a. a -> Mod Var a
Env.def Map Text Text
forall a. Monoid a => a
mempty)
  pure $
    NonEmpty BrokerAddress
-> ConsumerGroupId
-> TopicName
-> OffsetReset
-> Millis
-> Map Text Text
-> KafkaConsumerConfig
KafkaConsumerConfig
      NonEmpty BrokerAddress
brokerAddresses
      ConsumerGroupId
consumerGroupId
      TopicName
kafkaTopic
      OffsetReset
kafkaOffsetReset
      Millis
kafkaAutoOffsetInterval
      Map Text Text
kafkaExtraProps

class HasKafkaConsumer env where
  kafkaConsumerL :: Lens' env KafkaConsumer

consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
  [BrokerAddress] -> ConsumerProperties
brokersList [BrokerAddress]
brokers
    ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId ConsumerGroupId
kafkaConsumerConfigGroupId
    ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> Millis -> ConsumerProperties
autoCommit Millis
kafkaConsumerConfigAutoCommitInterval
    ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
logLevel KafkaLogLevel
KafkaLogInfo
 where
  brokers :: [BrokerAddress]
brokers = NonEmpty BrokerAddress -> [BrokerAddress]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses

subscription :: KafkaConsumerConfig -> Subscription
subscription :: KafkaConsumerConfig -> Subscription
subscription KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
  [TopicName] -> Subscription
topics [TopicName
kafkaConsumerConfigTopic]
    Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
kafkaConsumerConfigOffsetReset
    Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> Map Text Text -> Subscription
extraSubscriptionProps Map Text Text
kafkaConsumerConfigExtraSubscriptionProps

withKafkaConsumer
  :: (MonadUnliftIO m, HasCallStack)
  => KafkaConsumerConfig
  -> (KafkaConsumer -> m a)
  -> m a
withKafkaConsumer :: forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
KafkaConsumerConfig -> (KafkaConsumer -> m a) -> m a
withKafkaConsumer KafkaConsumerConfig
config = m KafkaConsumer
-> (KafkaConsumer -> m ()) -> (KafkaConsumer -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m KafkaConsumer
newConsumer KafkaConsumer -> m ()
forall {m :: * -> *}. MonadIO m => KafkaConsumer -> m ()
closeConsumer
 where
  (ConsumerProperties
props, Subscription
sub) = (KafkaConsumerConfig -> ConsumerProperties
consumerProps (KafkaConsumerConfig -> ConsumerProperties)
-> (KafkaConsumerConfig -> Subscription)
-> KafkaConsumerConfig
-> (ConsumerProperties, Subscription)
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& KafkaConsumerConfig -> Subscription
subscription) KafkaConsumerConfig
config
  newConsumer :: m KafkaConsumer
newConsumer = (KafkaError -> m KafkaConsumer)
-> (KafkaConsumer -> m KafkaConsumer)
-> Either KafkaError KafkaConsumer
-> m KafkaConsumer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either KafkaError -> m KafkaConsumer
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaConsumer -> m KafkaConsumer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either KafkaError KafkaConsumer -> m KafkaConsumer)
-> m (Either KafkaError KafkaConsumer) -> m KafkaConsumer
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Kafka.newConsumer ConsumerProperties
props Subscription
sub
  closeConsumer :: KafkaConsumer -> m ()
closeConsumer = m () -> (KafkaError -> m ()) -> Maybe KafkaError -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) KafkaError -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (Maybe KafkaError -> m ())
-> (KafkaConsumer -> m (Maybe KafkaError)) -> KafkaConsumer -> m ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Kafka.closeConsumer

timeoutMs :: Timeout -> Int
timeoutMs :: Timeout -> Int
timeoutMs = \case
  TimeoutSeconds Int
s -> Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
  TimeoutMilliseconds Int
ms -> Int
ms

data KafkaMessageDecodeError = KafkaMessageDecodeError
  { KafkaMessageDecodeError -> ByteString
input :: ByteString
  , KafkaMessageDecodeError -> String
errors :: String
  }
  deriving stock (Int -> KafkaMessageDecodeError -> ShowS
[KafkaMessageDecodeError] -> ShowS
KafkaMessageDecodeError -> String
(Int -> KafkaMessageDecodeError -> ShowS)
-> (KafkaMessageDecodeError -> String)
-> ([KafkaMessageDecodeError] -> ShowS)
-> Show KafkaMessageDecodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaMessageDecodeError -> ShowS
showsPrec :: Int -> KafkaMessageDecodeError -> ShowS
$cshow :: KafkaMessageDecodeError -> String
show :: KafkaMessageDecodeError -> String
$cshowList :: [KafkaMessageDecodeError] -> ShowS
showList :: [KafkaMessageDecodeError] -> ShowS
Show)

instance Exception KafkaMessageDecodeError where
  displayException :: KafkaMessageDecodeError -> String
displayException KafkaMessageDecodeError {String
ByteString
input :: KafkaMessageDecodeError -> ByteString
errors :: KafkaMessageDecodeError -> String
input :: ByteString
errors :: String
..} =
    [String] -> String
forall a. Monoid a => [a] -> a
mconcat
      [ String
"Unable to decode JSON"
      , String
"\n  input:  " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
unpack (ByteString -> Text
decodeUtf8 ByteString
input)
      , String
"\n  errors: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
errors
      ]

runConsumer
  :: ( MonadMask m
     , MonadUnliftIO m
     , MonadReader env m
     , MonadLogger m
     , MonadTracer m
     , HasKafkaConsumer env
     , FromJSON a
     , HasCallStack
     )
  => Timeout
  -> (a -> m ())
  -> m ()
runConsumer :: forall (m :: * -> *) env a.
(MonadMask m, MonadUnliftIO m, MonadReader env m, MonadLogger m,
 MonadTracer m, HasKafkaConsumer env, FromJSON a, HasCallStack) =>
Timeout -> (a -> m ()) -> m ()
runConsumer Timeout
pollTimeout a -> m ()
onMessage =
  m () -> m ()
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
withTraceIdContext (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m ()
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m, MonadLogger m) =>
m () -> m a
immortalCreateLogged (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    KafkaConsumer
consumer <- Getting KafkaConsumer env KafkaConsumer -> m KafkaConsumer
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaConsumer env KafkaConsumer
forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
Lens' env KafkaConsumer
kafkaConsumerL

    (m () -> [ExceptionHandler m ()] -> m ())
-> [ExceptionHandler m ()] -> m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip m () -> [ExceptionHandler m ()] -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
m a -> [ExceptionHandler m a] -> m a
catches [ExceptionHandler m ()]
forall {m :: * -> *}. MonadLogger m => [ExceptionHandler m ()]
handlers (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer" SpanArguments
consumerSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord <- Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError (Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
 -> m (Maybe
         (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< KafkaConsumer
-> Timeout
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
consumer Timeout
kTimeout

      Maybe ByteString -> (ByteString -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Maybe ByteString
forall k v. ConsumerRecord k v -> v
crValue (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
 -> Maybe ByteString)
-> Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Maybe ByteString
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord) ((ByteString -> m ()) -> m ()) -> (ByteString -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ByteString
bs -> do
        a
a <-
          Text -> SpanArguments -> m a -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.decode" SpanArguments
defaultSpanArguments (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$
            (String -> m a) -> (a -> m a) -> Either String a -> m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (KafkaMessageDecodeError -> m a
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (KafkaMessageDecodeError -> m a)
-> (String -> KafkaMessageDecodeError) -> String -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String -> KafkaMessageDecodeError
KafkaMessageDecodeError ByteString
bs) a -> m a
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> m a) -> Either String a -> m a
forall a b. (a -> b) -> a -> b
$
              ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs
        Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.handle" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
onMessage a
a
 where
  kTimeout :: Timeout
kTimeout = Int -> Timeout
Kafka.Timeout (Int -> Timeout) -> Int -> Timeout
forall a b. (a -> b) -> a -> b
$ Timeout -> Int
timeoutMs Timeout
pollTimeout

  handlers :: [ExceptionHandler m ()]
handlers =
    [ (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ())
-> (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
        Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
          (Message -> m ())
-> (AnnotatedException KafkaError -> Message)
-> AnnotatedException KafkaError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaError
            (Message -> KafkaError -> Message
forall a b. a -> b -> a
const Message
"Error polling for message from Kafka")
    , (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaMessageDecodeError -> m ())
 -> ExceptionHandler m ())
-> (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
        Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
          (Message -> m ())
-> (AnnotatedException KafkaMessageDecodeError -> Message)
-> AnnotatedException KafkaMessageDecodeError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaMessageDecodeError
            (Message -> KafkaMessageDecodeError -> Message
forall a b. a -> b -> a
const Message
"Could not decode message value")
    ]

fromKafkaError
  :: (MonadIO m, MonadLogger m, HasCallStack)
  => Either KafkaError a
  -> m (Maybe a)
fromKafkaError :: forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError =
  (KafkaError -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
    ( \case
        KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrTimedOut ->
          Maybe a
forall a. Maybe a
Nothing Maybe a -> m () -> m (Maybe a)
forall a b. a -> m b -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug Message
"Polling timeout"
        KafkaError
err -> KafkaError -> m (Maybe a)
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaError
err
    )
    ((a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just