{-# LANGUAGE ApplicativeDo #-}
module Freckle.App.Kafka.Consumer
( HasKafkaConsumer (..)
, withKafkaConsumer
, KafkaConsumerConfig (..)
, envKafkaConsumerConfig
, runConsumer
) where
import Freckle.App.Prelude
import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson
import Data.ByteString (ByteString)
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Env
import Freckle.App.Async
import Freckle.App.Env
import Freckle.App.Exception (annotatedExceptionMessageFrom)
import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses)
import Freckle.App.OpenTelemetry
import Kafka.Consumer hiding
( Timeout
, closeConsumer
, newConsumer
, runConsumer
, subscription
)
import qualified Kafka.Consumer as Kafka
import UnliftIO.Exception (bracket)
data KafkaConsumerConfig = KafkaConsumerConfig
{ KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
, KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigGroupId :: ConsumerGroupId
, KafkaConsumerConfig -> TopicName
kafkaConsumerConfigTopic :: TopicName
, KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigOffsetReset :: OffsetReset
, KafkaConsumerConfig -> Millis
kafkaConsumerConfigAutoCommitInterval :: Millis
, :: Map Text Text
}
deriving stock (Int -> KafkaConsumerConfig -> ShowS
[KafkaConsumerConfig] -> ShowS
KafkaConsumerConfig -> String
(Int -> KafkaConsumerConfig -> ShowS)
-> (KafkaConsumerConfig -> String)
-> ([KafkaConsumerConfig] -> ShowS)
-> Show KafkaConsumerConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaConsumerConfig -> ShowS
showsPrec :: Int -> KafkaConsumerConfig -> ShowS
$cshow :: KafkaConsumerConfig -> String
show :: KafkaConsumerConfig -> String
$cshowList :: [KafkaConsumerConfig] -> ShowS
showList :: [KafkaConsumerConfig] -> ShowS
Show)
envKafkaTopic
:: Env.Parser Env.Error TopicName
envKafkaTopic :: Parser Error TopicName
envKafkaTopic =
Reader Error TopicName
-> String -> Mod Var TopicName -> Parser Error TopicName
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
((String -> Either String TopicName) -> Reader Error TopicName
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String TopicName
readKafkaTopic)
String
"KAFKA_TOPIC"
Mod Var TopicName
forall a. Monoid a => a
mempty
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic String
t = case String -> Text
T.pack String
t of
Text
"" -> String -> Either String TopicName
forall a b. a -> Either a b
Left String
"Kafka topics cannot be empty"
Text
x -> TopicName -> Either String TopicName
forall a b. b -> Either a b
Right (TopicName -> Either String TopicName)
-> TopicName -> Either String TopicName
forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName Text
x
envKafkaOffsetReset
:: Env.Parser Env.Error OffsetReset
envKafkaOffsetReset :: Parser Error OffsetReset
envKafkaOffsetReset =
Reader Error OffsetReset
-> String -> Mod Var OffsetReset -> Parser Error OffsetReset
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
((String -> Either String OffsetReset) -> Reader Error OffsetReset
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String OffsetReset
readKafkaOffsetReset)
String
"KAFKA_OFFSET_RESET"
(Mod Var OffsetReset -> Parser Error OffsetReset)
-> Mod Var OffsetReset -> Parser Error OffsetReset
forall a b. (a -> b) -> a -> b
$ OffsetReset -> Mod Var OffsetReset
forall a. a -> Mod Var a
Env.def OffsetReset
Earliest
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset String
t = case String -> Text
T.pack String
t of
Text
"earliest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Earliest
Text
"latest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Latest
Text
_ -> String -> Either String OffsetReset
forall a b. a -> Either a b
Left String
"Kafka offset reset must be one of earliest or latest"
envKafkaConsumerConfig
:: Env.Parser Env.Error KafkaConsumerConfig
envKafkaConsumerConfig :: Parser Error KafkaConsumerConfig
envKafkaConsumerConfig = do
NonEmpty BrokerAddress
brokerAddresses <- Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses
ConsumerGroupId
consumerGroupId <- Reader Error ConsumerGroupId
-> String
-> Mod Var ConsumerGroupId
-> Parser Error ConsumerGroupId
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error ConsumerGroupId
forall e s. (AsEmpty e, IsString s) => Reader e s
Env.nonempty String
"KAFKA_CONSUMER_GROUP_ID" Mod Var ConsumerGroupId
forall a. Monoid a => a
mempty
TopicName
kafkaTopic <- Parser Error TopicName
envKafkaTopic
OffsetReset
kafkaOffsetReset <- Parser Error OffsetReset
envKafkaOffsetReset
Millis
kafkaAutoOffsetInterval <-
Int -> Millis
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Millis) -> (Timeout -> Int) -> Timeout -> Millis
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Int
timeoutMs (Timeout -> Millis)
-> (Mod Var Timeout -> Parser Error Timeout)
-> Mod Var Timeout
-> Parser Error Millis
forall (c :: * -> *) (d :: * -> *) a b.
(Functor c, Functor d) =>
(a -> b) -> c (d a) -> c (d b)
<$$> Reader Error Timeout
-> String -> Mod Var Timeout -> Parser Error Timeout
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Timeout
timeout String
"KAFKA_AUTO_COMMIT_INTERVAL" (Mod Var Timeout -> Parser Error Millis)
-> Mod Var Timeout -> Parser Error Millis
forall a b. (a -> b) -> a -> b
$
Timeout -> Mod Var Timeout
forall a. a -> Mod Var a
Env.def (Timeout -> Mod Var Timeout) -> Timeout -> Mod Var Timeout
forall a b. (a -> b) -> a -> b
$
Int -> Timeout
TimeoutMilliseconds Int
5000
Map Text Text
kafkaExtraProps <-
Reader Error (Map Text Text)
-> String
-> Mod Var (Map Text Text)
-> Parser Error (Map Text Text)
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
(([(Text, Text)] -> Map Text Text)
-> Either Error [(Text, Text)] -> Either Error (Map Text Text)
forall a b. (a -> b) -> Either Error a -> Either Error b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList (Either Error [(Text, Text)] -> Either Error (Map Text Text))
-> (String -> Either Error [(Text, Text)])
-> Reader Error (Map Text Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Either Error [(Text, Text)]
keyValues)
String
"KAFKA_EXTRA_SUBSCRIPTION_PROPS"
(Map Text Text -> Mod Var (Map Text Text)
forall a. a -> Mod Var a
Env.def Map Text Text
forall a. Monoid a => a
mempty)
pure $
NonEmpty BrokerAddress
-> ConsumerGroupId
-> TopicName
-> OffsetReset
-> Millis
-> Map Text Text
-> KafkaConsumerConfig
KafkaConsumerConfig
NonEmpty BrokerAddress
brokerAddresses
ConsumerGroupId
consumerGroupId
TopicName
kafkaTopic
OffsetReset
kafkaOffsetReset
Millis
kafkaAutoOffsetInterval
Map Text Text
kafkaExtraProps
class HasKafkaConsumer env where
kafkaConsumerL :: Lens' env KafkaConsumer
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
[BrokerAddress] -> ConsumerProperties
brokersList [BrokerAddress]
brokers
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId ConsumerGroupId
kafkaConsumerConfigGroupId
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> Millis -> ConsumerProperties
autoCommit Millis
kafkaConsumerConfigAutoCommitInterval
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
logLevel KafkaLogLevel
KafkaLogInfo
where
brokers :: [BrokerAddress]
brokers = NonEmpty BrokerAddress -> [BrokerAddress]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses
subscription :: KafkaConsumerConfig -> Subscription
subscription :: KafkaConsumerConfig -> Subscription
subscription KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
[TopicName] -> Subscription
topics [TopicName
kafkaConsumerConfigTopic]
Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
kafkaConsumerConfigOffsetReset
Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> Map Text Text -> Subscription
extraSubscriptionProps Map Text Text
kafkaConsumerConfigExtraSubscriptionProps
withKafkaConsumer
:: (MonadUnliftIO m, HasCallStack)
=> KafkaConsumerConfig
-> (KafkaConsumer -> m a)
-> m a
withKafkaConsumer :: forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
KafkaConsumerConfig -> (KafkaConsumer -> m a) -> m a
withKafkaConsumer KafkaConsumerConfig
config = m KafkaConsumer
-> (KafkaConsumer -> m ()) -> (KafkaConsumer -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m KafkaConsumer
newConsumer KafkaConsumer -> m ()
forall {m :: * -> *}. MonadIO m => KafkaConsumer -> m ()
closeConsumer
where
(ConsumerProperties
props, Subscription
sub) = (KafkaConsumerConfig -> ConsumerProperties
consumerProps (KafkaConsumerConfig -> ConsumerProperties)
-> (KafkaConsumerConfig -> Subscription)
-> KafkaConsumerConfig
-> (ConsumerProperties, Subscription)
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& KafkaConsumerConfig -> Subscription
subscription) KafkaConsumerConfig
config
newConsumer :: m KafkaConsumer
newConsumer = (KafkaError -> m KafkaConsumer)
-> (KafkaConsumer -> m KafkaConsumer)
-> Either KafkaError KafkaConsumer
-> m KafkaConsumer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either KafkaError -> m KafkaConsumer
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaConsumer -> m KafkaConsumer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either KafkaError KafkaConsumer -> m KafkaConsumer)
-> m (Either KafkaError KafkaConsumer) -> m KafkaConsumer
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Kafka.newConsumer ConsumerProperties
props Subscription
sub
closeConsumer :: KafkaConsumer -> m ()
closeConsumer = m () -> (KafkaError -> m ()) -> Maybe KafkaError -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) KafkaError -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (Maybe KafkaError -> m ())
-> (KafkaConsumer -> m (Maybe KafkaError)) -> KafkaConsumer -> m ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Kafka.closeConsumer
timeoutMs :: Timeout -> Int
timeoutMs :: Timeout -> Int
timeoutMs = \case
TimeoutSeconds Int
s -> Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
TimeoutMilliseconds Int
ms -> Int
ms
data KafkaMessageDecodeError = KafkaMessageDecodeError
{ KafkaMessageDecodeError -> ByteString
input :: ByteString
, KafkaMessageDecodeError -> String
errors :: String
}
deriving stock (Int -> KafkaMessageDecodeError -> ShowS
[KafkaMessageDecodeError] -> ShowS
KafkaMessageDecodeError -> String
(Int -> KafkaMessageDecodeError -> ShowS)
-> (KafkaMessageDecodeError -> String)
-> ([KafkaMessageDecodeError] -> ShowS)
-> Show KafkaMessageDecodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaMessageDecodeError -> ShowS
showsPrec :: Int -> KafkaMessageDecodeError -> ShowS
$cshow :: KafkaMessageDecodeError -> String
show :: KafkaMessageDecodeError -> String
$cshowList :: [KafkaMessageDecodeError] -> ShowS
showList :: [KafkaMessageDecodeError] -> ShowS
Show)
instance Exception KafkaMessageDecodeError where
displayException :: KafkaMessageDecodeError -> String
displayException KafkaMessageDecodeError {String
ByteString
input :: KafkaMessageDecodeError -> ByteString
errors :: KafkaMessageDecodeError -> String
input :: ByteString
errors :: String
..} =
[String] -> String
forall a. Monoid a => [a] -> a
mconcat
[ String
"Unable to decode JSON"
, String
"\n input: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
unpack (ByteString -> Text
decodeUtf8 ByteString
input)
, String
"\n errors: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
errors
]
runConsumer
:: ( MonadMask m
, MonadUnliftIO m
, MonadReader env m
, MonadLogger m
, MonadTracer m
, HasKafkaConsumer env
, FromJSON a
, HasCallStack
)
=> Timeout
-> (a -> m ())
-> m ()
runConsumer :: forall (m :: * -> *) env a.
(MonadMask m, MonadUnliftIO m, MonadReader env m, MonadLogger m,
MonadTracer m, HasKafkaConsumer env, FromJSON a, HasCallStack) =>
Timeout -> (a -> m ()) -> m ()
runConsumer Timeout
pollTimeout a -> m ()
onMessage =
m () -> m ()
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
withTraceIdContext (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m ()
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m, MonadLogger m) =>
m () -> m a
immortalCreateLogged (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
KafkaConsumer
consumer <- Getting KafkaConsumer env KafkaConsumer -> m KafkaConsumer
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaConsumer env KafkaConsumer
forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
Lens' env KafkaConsumer
kafkaConsumerL
(m () -> [ExceptionHandler m ()] -> m ())
-> [ExceptionHandler m ()] -> m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip m () -> [ExceptionHandler m ()] -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
m a -> [ExceptionHandler m a] -> m a
catches [ExceptionHandler m ()]
forall {m :: * -> *}. MonadLogger m => [ExceptionHandler m ()]
handlers (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer" SpanArguments
consumerSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord <- Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe
(ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
consumer Timeout
kTimeout
Maybe ByteString -> (ByteString -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Maybe ByteString
forall k v. ConsumerRecord k v -> v
crValue (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Maybe ByteString)
-> Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Maybe ByteString
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord) ((ByteString -> m ()) -> m ()) -> (ByteString -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ByteString
bs -> do
a
a <-
Text -> SpanArguments -> m a -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.decode" SpanArguments
defaultSpanArguments (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$
(String -> m a) -> (a -> m a) -> Either String a -> m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (KafkaMessageDecodeError -> m a
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (KafkaMessageDecodeError -> m a)
-> (String -> KafkaMessageDecodeError) -> String -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String -> KafkaMessageDecodeError
KafkaMessageDecodeError ByteString
bs) a -> m a
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> m a) -> Either String a -> m a
forall a b. (a -> b) -> a -> b
$
ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs
Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.handle" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
onMessage a
a
where
kTimeout :: Timeout
kTimeout = Int -> Timeout
Kafka.Timeout (Int -> Timeout) -> Int -> Timeout
forall a b. (a -> b) -> a -> b
$ Timeout -> Int
timeoutMs Timeout
pollTimeout
handlers :: [ExceptionHandler m ()]
handlers =
[ (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ())
-> (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
(Message -> m ())
-> (AnnotatedException KafkaError -> Message)
-> AnnotatedException KafkaError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaError
(Message -> KafkaError -> Message
forall a b. a -> b -> a
const Message
"Error polling for message from Kafka")
, (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ())
-> (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
(Message -> m ())
-> (AnnotatedException KafkaMessageDecodeError -> Message)
-> AnnotatedException KafkaMessageDecodeError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaMessageDecodeError
(Message -> KafkaMessageDecodeError -> Message
forall a b. a -> b -> a
const Message
"Could not decode message value")
]
fromKafkaError
:: (MonadIO m, MonadLogger m, HasCallStack)
=> Either KafkaError a
-> m (Maybe a)
fromKafkaError :: forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError =
(KafkaError -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \case
KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrTimedOut ->
Maybe a
forall a. Maybe a
Nothing Maybe a -> m () -> m (Maybe a)
forall a b. a -> m b -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug Message
"Polling timeout"
KafkaError
err -> KafkaError -> m (Maybe a)
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaError
err
)
((a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just