{-# LANGUAGE ApplicativeDo #-}

module Freckle.App.Kafka.Consumer
  ( HasKafkaConsumer (..)
  , withKafkaConsumer
  , KafkaConsumerConfig (..)
  , envKafkaConsumerConfig
  , runConsumer
  ) where

import Freckle.App.Prelude

import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson
import Data.ByteString (ByteString)
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Env
import Freckle.App.Async
import Freckle.App.Env
import Freckle.App.Exception (annotatedExceptionMessageFrom)
import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses)
import Freckle.App.OpenTelemetry
import Kafka.Consumer hiding
  ( Timeout
  , closeConsumer
  , newConsumer
  , runConsumer
  , subscription
  )
import qualified Kafka.Consumer as Kafka
import UnliftIO.Exception (bracket)

data KafkaConsumerConfig = KafkaConsumerConfig
  { KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
  -- ^ The list of host/port pairs for establishing the initial connection
  -- to the Kafka cluster.
  --
  -- This is the `bootstrap.servers` Kafka consumer configuration property.
  , KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigGroupId :: ConsumerGroupId
  -- ^ The consumer group id to which the consumer belongs.
  --
  -- This is the `group.id` Kafka consumer configuration property.
  , KafkaConsumerConfig -> TopicName
kafkaConsumerConfigTopic :: TopicName
  -- ^ The topic name polled for messages by the Kafka consumer.
  , KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigOffsetReset :: OffsetReset
  -- ^ The offset reset parameter used when there is no initial offset in Kafka.
  --
  -- This is the `auto.offset.reset` Kafka consumer configuration property.
  , KafkaConsumerConfig -> Millis
kafkaConsumerConfigAutoCommitInterval :: Millis
  -- ^ The interval that offsets are auto-committed to Kafka.
  --
  -- This sets the `auto.commit.interval.ms` and `enable.auto.commit` Kafka
  -- consumer configuration properties.
  , KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
  -- ^ Extra properties used to configure the Kafka consumer.
  }
  deriving stock (Int -> KafkaConsumerConfig -> ShowS
[KafkaConsumerConfig] -> ShowS
KafkaConsumerConfig -> String
(Int -> KafkaConsumerConfig -> ShowS)
-> (KafkaConsumerConfig -> String)
-> ([KafkaConsumerConfig] -> ShowS)
-> Show KafkaConsumerConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaConsumerConfig -> ShowS
showsPrec :: Int -> KafkaConsumerConfig -> ShowS
$cshow :: KafkaConsumerConfig -> String
show :: KafkaConsumerConfig -> String
$cshowList :: [KafkaConsumerConfig] -> ShowS
showList :: [KafkaConsumerConfig] -> ShowS
Show)

envKafkaTopic
  :: Env.Parser Env.Error TopicName
envKafkaTopic :: Parser Error TopicName
envKafkaTopic =
  Reader Error TopicName
-> String -> Mod Var TopicName -> Parser Error TopicName
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    ((String -> Either String TopicName) -> Reader Error TopicName
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String TopicName
readKafkaTopic)
    String
"KAFKA_TOPIC"
    Mod Var TopicName
forall a. Monoid a => a
mempty

readKafkaTopic :: String -> Either String TopicName
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic String
t = case String -> Text
T.pack String
t of
  Text
"" -> String -> Either String TopicName
forall a b. a -> Either a b
Left String
"Kafka topics cannot be empty"
  Text
x -> TopicName -> Either String TopicName
forall a b. b -> Either a b
Right (TopicName -> Either String TopicName)
-> TopicName -> Either String TopicName
forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName Text
x

envKafkaOffsetReset
  :: Env.Parser Env.Error OffsetReset
envKafkaOffsetReset :: Parser Error OffsetReset
envKafkaOffsetReset =
  Reader Error OffsetReset
-> String -> Mod Var OffsetReset -> Parser Error OffsetReset
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    ((String -> Either String OffsetReset) -> Reader Error OffsetReset
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String OffsetReset
readKafkaOffsetReset)
    String
"KAFKA_OFFSET_RESET"
    (Mod Var OffsetReset -> Parser Error OffsetReset)
-> Mod Var OffsetReset -> Parser Error OffsetReset
forall a b. (a -> b) -> a -> b
$ OffsetReset -> Mod Var OffsetReset
forall a. a -> Mod Var a
Env.def OffsetReset
Earliest

readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset String
t = case String -> Text
T.pack String
t of
  Text
"earliest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Earliest
  Text
"latest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Latest
  Text
_ -> String -> Either String OffsetReset
forall a b. a -> Either a b
Left String
"Kafka offset reset must be one of earliest or latest"

envKafkaConsumerConfig
  :: Env.Parser Env.Error KafkaConsumerConfig
envKafkaConsumerConfig :: Parser Error KafkaConsumerConfig
envKafkaConsumerConfig = do
  NonEmpty BrokerAddress
brokerAddresses <- Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses
  ConsumerGroupId
consumerGroupId <- Reader Error ConsumerGroupId
-> String
-> Mod Var ConsumerGroupId
-> Parser Error ConsumerGroupId
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error ConsumerGroupId
forall e s. (AsEmpty e, IsString s) => Reader e s
Env.nonempty String
"KAFKA_CONSUMER_GROUP_ID" Mod Var ConsumerGroupId
forall a. Monoid a => a
mempty
  TopicName
kafkaTopic <- Parser Error TopicName
envKafkaTopic
  OffsetReset
kafkaOffsetReset <- Parser Error OffsetReset
envKafkaOffsetReset
  Millis
kafkaAutoOffsetInterval <-
    Int -> Millis
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Millis) -> (Timeout -> Int) -> Timeout -> Millis
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Int
timeoutMs (Timeout -> Millis)
-> (Mod Var Timeout -> Parser Error Timeout)
-> Mod Var Timeout
-> Parser Error Millis
forall (c :: * -> *) (d :: * -> *) a b.
(Functor c, Functor d) =>
(a -> b) -> c (d a) -> c (d b)
<$$> Reader Error Timeout
-> String -> Mod Var Timeout -> Parser Error Timeout
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Timeout
timeout String
"KAFKA_AUTO_COMMIT_INTERVAL" (Mod Var Timeout -> Parser Error Millis)
-> Mod Var Timeout -> Parser Error Millis
forall a b. (a -> b) -> a -> b
$
      Timeout -> Mod Var Timeout
forall a. a -> Mod Var a
Env.def (Timeout -> Mod Var Timeout) -> Timeout -> Mod Var Timeout
forall a b. (a -> b) -> a -> b
$
        Int -> Timeout
TimeoutMilliseconds Int
5000
  Map Text Text
kafkaExtraProps <-
    Reader Error (Map Text Text)
-> String
-> Mod Var (Map Text Text)
-> Parser Error (Map Text Text)
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
      (([(Text, Text)] -> Map Text Text)
-> Either Error [(Text, Text)] -> Either Error (Map Text Text)
forall a b. (a -> b) -> Either Error a -> Either Error b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList (Either Error [(Text, Text)] -> Either Error (Map Text Text))
-> (String -> Either Error [(Text, Text)])
-> Reader Error (Map Text Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Either Error [(Text, Text)]
keyValues)
      String
"KAFKA_EXTRA_SUBSCRIPTION_PROPS"
      (Map Text Text -> Mod Var (Map Text Text)
forall a. a -> Mod Var a
Env.def Map Text Text
forall a. Monoid a => a
mempty)
  pure $
    NonEmpty BrokerAddress
-> ConsumerGroupId
-> TopicName
-> OffsetReset
-> Millis
-> Map Text Text
-> KafkaConsumerConfig
KafkaConsumerConfig
      NonEmpty BrokerAddress
brokerAddresses
      ConsumerGroupId
consumerGroupId
      TopicName
kafkaTopic
      OffsetReset
kafkaOffsetReset
      Millis
kafkaAutoOffsetInterval
      Map Text Text
kafkaExtraProps

class HasKafkaConsumer env where
  kafkaConsumerL :: Lens' env KafkaConsumer

consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
  [BrokerAddress] -> ConsumerProperties
brokersList [BrokerAddress]
brokers
    ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId ConsumerGroupId
kafkaConsumerConfigGroupId
    ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> Millis -> ConsumerProperties
autoCommit Millis
kafkaConsumerConfigAutoCommitInterval
    ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
logLevel KafkaLogLevel
KafkaLogInfo
 where
  brokers :: [BrokerAddress]
brokers = NonEmpty BrokerAddress -> [BrokerAddress]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses

subscription :: KafkaConsumerConfig -> Subscription
subscription :: KafkaConsumerConfig -> Subscription
subscription KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
  [TopicName] -> Subscription
topics [TopicName
kafkaConsumerConfigTopic]
    Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
kafkaConsumerConfigOffsetReset
    Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> Map Text Text -> Subscription
extraSubscriptionProps Map Text Text
kafkaConsumerConfigExtraSubscriptionProps

withKafkaConsumer
  :: (MonadUnliftIO m, HasCallStack)
  => KafkaConsumerConfig
  -> (KafkaConsumer -> m a)
  -> m a
withKafkaConsumer :: forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
KafkaConsumerConfig -> (KafkaConsumer -> m a) -> m a
withKafkaConsumer KafkaConsumerConfig
config = m KafkaConsumer
-> (KafkaConsumer -> m ()) -> (KafkaConsumer -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m KafkaConsumer
newConsumer KafkaConsumer -> m ()
forall {m :: * -> *}. MonadIO m => KafkaConsumer -> m ()
closeConsumer
 where
  (ConsumerProperties
props, Subscription
sub) = (KafkaConsumerConfig -> ConsumerProperties
consumerProps (KafkaConsumerConfig -> ConsumerProperties)
-> (KafkaConsumerConfig -> Subscription)
-> KafkaConsumerConfig
-> (ConsumerProperties, Subscription)
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& KafkaConsumerConfig -> Subscription
subscription) KafkaConsumerConfig
config
  newConsumer :: m KafkaConsumer
newConsumer = (KafkaError -> m KafkaConsumer)
-> (KafkaConsumer -> m KafkaConsumer)
-> Either KafkaError KafkaConsumer
-> m KafkaConsumer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either KafkaError -> m KafkaConsumer
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaConsumer -> m KafkaConsumer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either KafkaError KafkaConsumer -> m KafkaConsumer)
-> m (Either KafkaError KafkaConsumer) -> m KafkaConsumer
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Kafka.newConsumer ConsumerProperties
props Subscription
sub
  closeConsumer :: KafkaConsumer -> m ()
closeConsumer = m () -> (KafkaError -> m ()) -> Maybe KafkaError -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) KafkaError -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (Maybe KafkaError -> m ())
-> (KafkaConsumer -> m (Maybe KafkaError)) -> KafkaConsumer -> m ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Kafka.closeConsumer

timeoutMs :: Timeout -> Int
timeoutMs :: Timeout -> Int
timeoutMs = \case
  TimeoutSeconds Int
s -> Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
  TimeoutMilliseconds Int
ms -> Int
ms

data KafkaMessageDecodeError = KafkaMessageDecodeError
  { KafkaMessageDecodeError -> ByteString
input :: ByteString
  , KafkaMessageDecodeError -> String
errors :: String
  }
  deriving stock (Int -> KafkaMessageDecodeError -> ShowS
[KafkaMessageDecodeError] -> ShowS
KafkaMessageDecodeError -> String
(Int -> KafkaMessageDecodeError -> ShowS)
-> (KafkaMessageDecodeError -> String)
-> ([KafkaMessageDecodeError] -> ShowS)
-> Show KafkaMessageDecodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaMessageDecodeError -> ShowS
showsPrec :: Int -> KafkaMessageDecodeError -> ShowS
$cshow :: KafkaMessageDecodeError -> String
show :: KafkaMessageDecodeError -> String
$cshowList :: [KafkaMessageDecodeError] -> ShowS
showList :: [KafkaMessageDecodeError] -> ShowS
Show)

instance Exception KafkaMessageDecodeError where
  displayException :: KafkaMessageDecodeError -> String
displayException KafkaMessageDecodeError {String
ByteString
input :: KafkaMessageDecodeError -> ByteString
errors :: KafkaMessageDecodeError -> String
input :: ByteString
errors :: String
..} =
    [String] -> String
forall a. Monoid a => [a] -> a
mconcat
      [ String
"Unable to decode JSON"
      , String
"\n  input:  " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
unpack (ByteString -> Text
decodeUtf8 ByteString
input)
      , String
"\n  errors: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
errors
      ]

runConsumer
  :: ( MonadMask m
     , MonadUnliftIO m
     , MonadReader env m
     , MonadLogger m
     , MonadTracer m
     , HasKafkaConsumer env
     , FromJSON a
     , HasCallStack
     )
  => Timeout
  -> (a -> m ())
  -> m ()
runConsumer :: forall (m :: * -> *) env a.
(MonadMask m, MonadUnliftIO m, MonadReader env m, MonadLogger m,
 MonadTracer m, HasKafkaConsumer env, FromJSON a, HasCallStack) =>
Timeout -> (a -> m ()) -> m ()
runConsumer Timeout
pollTimeout a -> m ()
onMessage =
  m () -> m ()
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
withTraceIdContext (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m ()
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m, MonadLogger m) =>
m () -> m a
immortalCreateLogged (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    KafkaConsumer
consumer <- Getting KafkaConsumer env KafkaConsumer -> m KafkaConsumer
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaConsumer env KafkaConsumer
forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
Lens' env KafkaConsumer
kafkaConsumerL

    (m () -> [ExceptionHandler m ()] -> m ())
-> [ExceptionHandler m ()] -> m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip m () -> [ExceptionHandler m ()] -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
m a -> [ExceptionHandler m a] -> m a
catches [ExceptionHandler m ()]
forall {m :: * -> *}. MonadLogger m => [ExceptionHandler m ()]
handlers (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer" SpanArguments
consumerSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord <- Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError (Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
 -> m (Maybe
         (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< KafkaConsumer
-> Timeout
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
consumer Timeout
kTimeout

      Maybe ByteString -> (ByteString -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Maybe ByteString
forall k v. ConsumerRecord k v -> v
crValue (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
 -> Maybe ByteString)
-> Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Maybe ByteString
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord) ((ByteString -> m ()) -> m ()) -> (ByteString -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ByteString
bs -> do
        a
a <-
          Text -> SpanArguments -> m a -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.decode" SpanArguments
defaultSpanArguments (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$
            (String -> m a) -> (a -> m a) -> Either String a -> m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (KafkaMessageDecodeError -> m a
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (KafkaMessageDecodeError -> m a)
-> (String -> KafkaMessageDecodeError) -> String -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String -> KafkaMessageDecodeError
KafkaMessageDecodeError ByteString
bs) a -> m a
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> m a) -> Either String a -> m a
forall a b. (a -> b) -> a -> b
$
              ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs
        Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.handle" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
onMessage a
a
 where
  kTimeout :: Timeout
kTimeout = Int -> Timeout
Kafka.Timeout (Int -> Timeout) -> Int -> Timeout
forall a b. (a -> b) -> a -> b
$ Timeout -> Int
timeoutMs Timeout
pollTimeout

  handlers :: [ExceptionHandler m ()]
handlers =
    [ (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ())
-> (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
        Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
          (Message -> m ())
-> (AnnotatedException KafkaError -> Message)
-> AnnotatedException KafkaError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaError
            (Message -> KafkaError -> Message
forall a b. a -> b -> a
const Message
"Error polling for message from Kafka")
    , (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaMessageDecodeError -> m ())
 -> ExceptionHandler m ())
-> (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
        Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
          (Message -> m ())
-> (AnnotatedException KafkaMessageDecodeError -> Message)
-> AnnotatedException KafkaMessageDecodeError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaMessageDecodeError
            (Message -> KafkaMessageDecodeError -> Message
forall a b. a -> b -> a
const Message
"Could not decode message value")
    ]

fromKafkaError
  :: (MonadIO m, MonadLogger m, HasCallStack)
  => Either KafkaError a
  -> m (Maybe a)
fromKafkaError :: forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError =
  (KafkaError -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
    ( \case
        KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrTimedOut ->
          Maybe a
forall a. Maybe a
Nothing Maybe a -> m () -> m (Maybe a)
forall a b. a -> m b -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug Message
"Polling timeout"
        KafkaError
err -> KafkaError -> m (Maybe a)
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaError
err
    )
    ((a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just