{-# LANGUAGE ApplicativeDo #-}
module Freckle.App.Kafka.Consumer
( HasKafkaConsumer (..)
, withKafkaConsumer
, KafkaConsumerConfig (..)
, envKafkaConsumerConfig
, runConsumer
) where
import Freckle.App.Prelude
import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Env
import Freckle.App.Async
import Freckle.App.Env
import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses)
import Freckle.App.OpenTelemetry (withTraceIdContext)
import Kafka.Consumer hiding
( Timeout
, closeConsumer
, newConsumer
, runConsumer
, subscription
)
import qualified Kafka.Consumer as Kafka
import UnliftIO.Exception (bracket, throwIO)
data KafkaConsumerConfig = KafkaConsumerConfig
{ KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
, KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigGroupId :: ConsumerGroupId
, KafkaConsumerConfig -> TopicName
kafkaConsumerConfigTopic :: TopicName
, KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigOffsetReset :: OffsetReset
, KafkaConsumerConfig -> Millis
kafkaConsumerConfigAutoCommitInterval :: Millis
, :: Map Text Text
}
deriving stock (Int -> KafkaConsumerConfig -> ShowS
[KafkaConsumerConfig] -> ShowS
KafkaConsumerConfig -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaConsumerConfig] -> ShowS
$cshowList :: [KafkaConsumerConfig] -> ShowS
show :: KafkaConsumerConfig -> String
$cshow :: KafkaConsumerConfig -> String
showsPrec :: Int -> KafkaConsumerConfig -> ShowS
$cshowsPrec :: Int -> KafkaConsumerConfig -> ShowS
Show)
envKafkaTopic
:: Env.Parser Env.Error TopicName
envKafkaTopic :: Parser Error TopicName
envKafkaTopic =
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
(forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String TopicName
readKafkaTopic)
String
"KAFKA_TOPIC"
forall a. Monoid a => a
mempty
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic String
t = case String -> Text
T.pack String
t of
Text
"" -> forall a b. a -> Either a b
Left String
"Kafka topics cannot be empty"
Text
x -> forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName Text
x
envKafkaOffsetReset
:: Env.Parser Env.Error OffsetReset
envKafkaOffsetReset :: Parser Error OffsetReset
envKafkaOffsetReset =
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
(forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String OffsetReset
readKafkaOffsetReset)
String
"KAFKA_OFFSET_RESET"
forall a b. (a -> b) -> a -> b
$ forall a. a -> Mod Var a
Env.def OffsetReset
Earliest
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset String
t = case String -> Text
T.pack String
t of
Text
"earliest" -> forall a b. b -> Either a b
Right OffsetReset
Earliest
Text
"latest" -> forall a b. b -> Either a b
Right OffsetReset
Latest
Text
_ -> forall a b. a -> Either a b
Left String
"Kafka offset reset must be one of earliest or latest"
envKafkaConsumerConfig
:: Env.Parser Env.Error KafkaConsumerConfig
envKafkaConsumerConfig :: Parser Error KafkaConsumerConfig
envKafkaConsumerConfig = do
NonEmpty BrokerAddress
brokerAddresses <- Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses
ConsumerGroupId
consumerGroupId <- forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var forall e s. (AsEmpty e, IsString s) => Reader e s
Env.nonempty String
"KAFKA_CONSUMER_GROUP_ID" forall a. Monoid a => a
mempty
TopicName
kafkaTopic <- Parser Error TopicName
envKafkaTopic
OffsetReset
kafkaOffsetReset <- Parser Error OffsetReset
envKafkaOffsetReset
Millis
kafkaAutoOffsetInterval <-
forall a b. (Integral a, Num b) => a -> b
fromIntegral forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Int
timeoutMs forall (c :: * -> *) (d :: * -> *) a b.
(Functor c, Functor d) =>
(a -> b) -> c (d a) -> c (d b)
<$$> forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Timeout
timeout String
"KAFKA_AUTO_COMMIT_INTERVAL" forall a b. (a -> b) -> a -> b
$
forall a. a -> Mod Var a
Env.def forall a b. (a -> b) -> a -> b
$
Int -> Timeout
TimeoutMilliseconds Int
5000
Map Text Text
kafkaExtraProps <-
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
(forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c
. Reader Error [(Text, Text)]
keyValues)
String
"KAFKA_EXTRA_SUBSCRIPTION_PROPS"
(forall a. a -> Mod Var a
Env.def forall a. Monoid a => a
mempty)
pure $
NonEmpty BrokerAddress
-> ConsumerGroupId
-> TopicName
-> OffsetReset
-> Millis
-> Map Text Text
-> KafkaConsumerConfig
KafkaConsumerConfig
NonEmpty BrokerAddress
brokerAddresses
ConsumerGroupId
consumerGroupId
TopicName
kafkaTopic
OffsetReset
kafkaOffsetReset
Millis
kafkaAutoOffsetInterval
Map Text Text
kafkaExtraProps
class HasKafkaConsumer env where
kafkaConsumerL :: Lens' env KafkaConsumer
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
ConsumerGroupId
OffsetReset
Millis
TopicName
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
..} =
[BrokerAddress] -> ConsumerProperties
brokersList [BrokerAddress]
brokers
forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId ConsumerGroupId
kafkaConsumerConfigGroupId
forall a. Semigroup a => a -> a -> a
<> Millis -> ConsumerProperties
autoCommit Millis
kafkaConsumerConfigAutoCommitInterval
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
logLevel KafkaLogLevel
KafkaLogInfo
where
brokers :: [BrokerAddress]
brokers = forall a. NonEmpty a -> [a]
NE.toList NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses
subscription :: KafkaConsumerConfig -> Subscription
subscription :: KafkaConsumerConfig -> Subscription
subscription KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
ConsumerGroupId
OffsetReset
Millis
TopicName
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
..} =
[TopicName] -> Subscription
topics [TopicName
kafkaConsumerConfigTopic]
forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
kafkaConsumerConfigOffsetReset
forall a. Semigroup a => a -> a -> a
<> Map Text Text -> Subscription
extraSubscriptionProps Map Text Text
kafkaConsumerConfigExtraSubscriptionProps
withKafkaConsumer
:: MonadUnliftIO m
=> KafkaConsumerConfig
-> (KafkaConsumer -> m a)
-> m a
withKafkaConsumer :: forall (m :: * -> *) a.
MonadUnliftIO m =>
KafkaConsumerConfig -> (KafkaConsumer -> m a) -> m a
withKafkaConsumer KafkaConsumerConfig
config = forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m KafkaConsumer
newConsumer forall {m :: * -> *}. MonadIO m => KafkaConsumer -> m ()
closeConsumer
where
(ConsumerProperties
props, Subscription
sub) = (KafkaConsumerConfig -> ConsumerProperties
consumerProps forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& KafkaConsumerConfig -> Subscription
subscription) KafkaConsumerConfig
config
newConsumer :: m KafkaConsumer
newConsumer = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Kafka.newConsumer ConsumerProperties
props Subscription
sub
closeConsumer :: KafkaConsumer -> m ()
closeConsumer = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Kafka.closeConsumer
timeoutMs :: Timeout -> Int
timeoutMs :: Timeout -> Int
timeoutMs = \case
TimeoutSeconds Int
s -> Int
s forall a. Num a => a -> a -> a
* Int
1000
TimeoutMilliseconds Int
ms -> Int
ms
runConsumer
:: ( MonadMask m
, MonadUnliftIO m
, MonadReader env m
, MonadLogger m
, HasKafkaConsumer env
, FromJSON a
)
=> Timeout
-> (a -> m ())
-> m ()
runConsumer :: forall (m :: * -> *) env a.
(MonadMask m, MonadUnliftIO m, MonadReader env m, MonadLogger m,
HasKafkaConsumer env, FromJSON a) =>
Timeout -> (a -> m ()) -> m ()
runConsumer Timeout
pollTimeout a -> m ()
onMessage =
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
withTraceIdContext forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m, MonadLogger m) =>
m () -> m a
immortalCreateLogged forall a b. (a -> b) -> a -> b
$ do
KafkaConsumer
consumer <- forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
kafkaConsumerL
Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
eMessage <-
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
consumer forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Kafka.Timeout forall a b. (a -> b) -> a -> b
$ Timeout -> Int
timeoutMs Timeout
pollTimeout
case Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
eMessage of
Left (KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrTimedOut) -> forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug Message
"Polling timeout"
Left KafkaError
err -> forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError forall a b. (a -> b) -> a -> b
$ Text
"Error polling for message from Kafka" Text -> [SeriesElem] -> Message
:# [Key
"error" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show KafkaError
err]
Right ConsumerRecord {Maybe ByteString
Offset
Timestamp
PartitionId
TopicName
crTopic :: forall k v. ConsumerRecord k v -> TopicName
crPartition :: forall k v. ConsumerRecord k v -> PartitionId
crOffset :: forall k v. ConsumerRecord k v -> Offset
crTimestamp :: forall k v. ConsumerRecord k v -> Timestamp
crKey :: forall k v. ConsumerRecord k v -> k
crValue :: forall k v. ConsumerRecord k v -> v
crValue :: Maybe ByteString
crKey :: Maybe ByteString
crTimestamp :: Timestamp
crOffset :: Offset
crPartition :: PartitionId
crTopic :: TopicName
..} -> forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe ByteString
crValue forall a b. (a -> b) -> a -> b
$ \ByteString
bs ->
case forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs of
Left String
err -> forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError forall a b. (a -> b) -> a -> b
$ Text
"Could not decode message value" Text -> [SeriesElem] -> Message
:# [Key
"error" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
err]
Right a
a -> a -> m ()
onMessage a
a