{-# LANGUAGE ApplicativeDo #-}

module Freckle.App.Kafka.Consumer
  ( HasKafkaConsumer (..)
  , withKafkaConsumer
  , KafkaConsumerConfig (..)
  , envKafkaConsumerConfig
  , runConsumer
  ) where

import Freckle.App.Prelude

import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Env
import Freckle.App.Async
import Freckle.App.Env
import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses)
import Freckle.App.OpenTelemetry (withTraceIdContext)
import Kafka.Consumer hiding
  ( Timeout
  , closeConsumer
  , newConsumer
  , runConsumer
  , subscription
  )
import qualified Kafka.Consumer as Kafka
import UnliftIO.Exception (bracket, throwIO)

data KafkaConsumerConfig = KafkaConsumerConfig
  { KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
  -- ^ The list of host/port pairs for establishing the initial connection
  -- to the Kafka cluster.
  --
  -- This is the `bootstrap.servers` Kafka consumer configuration property.
  , KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigGroupId :: ConsumerGroupId
  -- ^ The consumer group id to which the consumer belongs.
  --
  -- This is the `group.id` Kafka consumer configuration property.
  , KafkaConsumerConfig -> TopicName
kafkaConsumerConfigTopic :: TopicName
  -- ^ The topic name polled for messages by the Kafka consumer.
  , KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigOffsetReset :: OffsetReset
  -- ^ The offset reset parameter used when there is no initial offset in Kafka.
  --
  -- This is the `auto.offset.reset` Kafka consumer configuration property.
  , KafkaConsumerConfig -> Millis
kafkaConsumerConfigAutoCommitInterval :: Millis
  -- ^ The interval that offsets are auto-committed to Kafka.
  --
  -- This sets the `auto.commit.interval.ms` and `enable.auto.commit` Kafka
  -- consumer configuration properties.
  , KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
  -- ^ Extra properties used to configure the Kafka consumer.
  }
  deriving stock (Int -> KafkaConsumerConfig -> ShowS
[KafkaConsumerConfig] -> ShowS
KafkaConsumerConfig -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaConsumerConfig] -> ShowS
$cshowList :: [KafkaConsumerConfig] -> ShowS
show :: KafkaConsumerConfig -> String
$cshow :: KafkaConsumerConfig -> String
showsPrec :: Int -> KafkaConsumerConfig -> ShowS
$cshowsPrec :: Int -> KafkaConsumerConfig -> ShowS
Show)

envKafkaTopic
  :: Env.Parser Env.Error TopicName
envKafkaTopic :: Parser Error TopicName
envKafkaTopic =
  forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    (forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String TopicName
readKafkaTopic)
    String
"KAFKA_TOPIC"
    forall a. Monoid a => a
mempty

readKafkaTopic :: String -> Either String TopicName
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic String
t = case String -> Text
T.pack String
t of
  Text
"" -> forall a b. a -> Either a b
Left String
"Kafka topics cannot be empty"
  Text
x -> forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName Text
x

envKafkaOffsetReset
  :: Env.Parser Env.Error OffsetReset
envKafkaOffsetReset :: Parser Error OffsetReset
envKafkaOffsetReset =
  forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    (forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String OffsetReset
readKafkaOffsetReset)
    String
"KAFKA_OFFSET_RESET"
    forall a b. (a -> b) -> a -> b
$ forall a. a -> Mod Var a
Env.def OffsetReset
Earliest

readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset String
t = case String -> Text
T.pack String
t of
  Text
"earliest" -> forall a b. b -> Either a b
Right OffsetReset
Earliest
  Text
"latest" -> forall a b. b -> Either a b
Right OffsetReset
Latest
  Text
_ -> forall a b. a -> Either a b
Left String
"Kafka offset reset must be one of earliest or latest"

envKafkaConsumerConfig
  :: Env.Parser Env.Error KafkaConsumerConfig
envKafkaConsumerConfig :: Parser Error KafkaConsumerConfig
envKafkaConsumerConfig = do
  NonEmpty BrokerAddress
brokerAddresses <- Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses
  ConsumerGroupId
consumerGroupId <- forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var forall e s. (AsEmpty e, IsString s) => Reader e s
Env.nonempty String
"KAFKA_CONSUMER_GROUP_ID" forall a. Monoid a => a
mempty
  TopicName
kafkaTopic <- Parser Error TopicName
envKafkaTopic
  OffsetReset
kafkaOffsetReset <- Parser Error OffsetReset
envKafkaOffsetReset
  Millis
kafkaAutoOffsetInterval <-
    forall a b. (Integral a, Num b) => a -> b
fromIntegral forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Int
timeoutMs forall (c :: * -> *) (d :: * -> *) a b.
(Functor c, Functor d) =>
(a -> b) -> c (d a) -> c (d b)
<$$> forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Timeout
timeout String
"KAFKA_AUTO_COMMIT_INTERVAL" forall a b. (a -> b) -> a -> b
$
      forall a. a -> Mod Var a
Env.def forall a b. (a -> b) -> a -> b
$
        Int -> Timeout
TimeoutMilliseconds Int
5000
  Map Text Text
kafkaExtraProps <-
    forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
      (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c
. Reader Error [(Text, Text)]
keyValues)
      String
"KAFKA_EXTRA_SUBSCRIPTION_PROPS"
      (forall a. a -> Mod Var a
Env.def forall a. Monoid a => a
mempty)
  pure $
    NonEmpty BrokerAddress
-> ConsumerGroupId
-> TopicName
-> OffsetReset
-> Millis
-> Map Text Text
-> KafkaConsumerConfig
KafkaConsumerConfig
      NonEmpty BrokerAddress
brokerAddresses
      ConsumerGroupId
consumerGroupId
      TopicName
kafkaTopic
      OffsetReset
kafkaOffsetReset
      Millis
kafkaAutoOffsetInterval
      Map Text Text
kafkaExtraProps

class HasKafkaConsumer env where
  kafkaConsumerL :: Lens' env KafkaConsumer

consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
ConsumerGroupId
OffsetReset
Millis
TopicName
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
..} =
  [BrokerAddress] -> ConsumerProperties
brokersList [BrokerAddress]
brokers
    forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId ConsumerGroupId
kafkaConsumerConfigGroupId
    forall a. Semigroup a => a -> a -> a
<> Millis -> ConsumerProperties
autoCommit Millis
kafkaConsumerConfigAutoCommitInterval
    forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
logLevel KafkaLogLevel
KafkaLogInfo
 where
  brokers :: [BrokerAddress]
brokers = forall a. NonEmpty a -> [a]
NE.toList NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses

subscription :: KafkaConsumerConfig -> Subscription
subscription :: KafkaConsumerConfig -> Subscription
subscription KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
ConsumerGroupId
OffsetReset
Millis
TopicName
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
..} =
  [TopicName] -> Subscription
topics [TopicName
kafkaConsumerConfigTopic]
    forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
kafkaConsumerConfigOffsetReset
    forall a. Semigroup a => a -> a -> a
<> Map Text Text -> Subscription
extraSubscriptionProps Map Text Text
kafkaConsumerConfigExtraSubscriptionProps

withKafkaConsumer
  :: MonadUnliftIO m
  => KafkaConsumerConfig
  -> (KafkaConsumer -> m a)
  -> m a
withKafkaConsumer :: forall (m :: * -> *) a.
MonadUnliftIO m =>
KafkaConsumerConfig -> (KafkaConsumer -> m a) -> m a
withKafkaConsumer KafkaConsumerConfig
config = forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m KafkaConsumer
newConsumer forall {m :: * -> *}. MonadIO m => KafkaConsumer -> m ()
closeConsumer
 where
  (ConsumerProperties
props, Subscription
sub) = (KafkaConsumerConfig -> ConsumerProperties
consumerProps forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& KafkaConsumerConfig -> Subscription
subscription) KafkaConsumerConfig
config
  newConsumer :: m KafkaConsumer
newConsumer = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Kafka.newConsumer ConsumerProperties
props Subscription
sub
  closeConsumer :: KafkaConsumer -> m ()
closeConsumer = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Kafka.closeConsumer

timeoutMs :: Timeout -> Int
timeoutMs :: Timeout -> Int
timeoutMs = \case
  TimeoutSeconds Int
s -> Int
s forall a. Num a => a -> a -> a
* Int
1000
  TimeoutMilliseconds Int
ms -> Int
ms

runConsumer
  :: ( MonadMask m
     , MonadUnliftIO m
     , MonadReader env m
     , MonadLogger m
     , HasKafkaConsumer env
     , FromJSON a
     )
  => Timeout
  -> (a -> m ())
  -> m ()
runConsumer :: forall (m :: * -> *) env a.
(MonadMask m, MonadUnliftIO m, MonadReader env m, MonadLogger m,
 HasKafkaConsumer env, FromJSON a) =>
Timeout -> (a -> m ()) -> m ()
runConsumer Timeout
pollTimeout a -> m ()
onMessage =
  forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
withTraceIdContext forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m, MonadLogger m) =>
m () -> m a
immortalCreateLogged forall a b. (a -> b) -> a -> b
$ do
    KafkaConsumer
consumer <- forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
kafkaConsumerL
    Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
eMessage <-
      forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
consumer forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Kafka.Timeout forall a b. (a -> b) -> a -> b
$ Timeout -> Int
timeoutMs Timeout
pollTimeout
    case Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
eMessage of
      Left (KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrTimedOut) -> forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug Message
"Polling timeout"
      Left KafkaError
err -> forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError forall a b. (a -> b) -> a -> b
$ Text
"Error polling for message from Kafka" Text -> [SeriesElem] -> Message
:# [Key
"error" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show KafkaError
err]
      Right ConsumerRecord {Maybe ByteString
Offset
Timestamp
PartitionId
TopicName
crTopic :: forall k v. ConsumerRecord k v -> TopicName
crPartition :: forall k v. ConsumerRecord k v -> PartitionId
crOffset :: forall k v. ConsumerRecord k v -> Offset
crTimestamp :: forall k v. ConsumerRecord k v -> Timestamp
crKey :: forall k v. ConsumerRecord k v -> k
crValue :: forall k v. ConsumerRecord k v -> v
crValue :: Maybe ByteString
crKey :: Maybe ByteString
crTimestamp :: Timestamp
crOffset :: Offset
crPartition :: PartitionId
crTopic :: TopicName
..} -> forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe ByteString
crValue forall a b. (a -> b) -> a -> b
$ \ByteString
bs ->
        case forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs of
          Left String
err -> forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logError forall a b. (a -> b) -> a -> b
$ Text
"Could not decode message value" Text -> [SeriesElem] -> Message
:# [Key
"error" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
err]
          Right a
a -> a -> m ()
onMessage a
a