{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE NamedFieldPuns #-}

module Freckle.App.Kafka.Producer
  ( envKafkaBrokerAddresses
  , KafkaProducerPoolConfig (..)
  , envKafkaProducerPoolConfig
  , KafkaProducerPool (..)
  , HasKafkaProducerPool (..)
  , createKafkaProducerPool
  , produceKeyedOn
  , produceKeyedOnAsync
  ) where

import Freckle.App.Prelude

import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson (ToJSON, encode)
import Data.ByteString.Lazy (toStrict)
import qualified Data.HashMap.Strict as HashMap
import qualified Data.List.NonEmpty as NE
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import qualified Data.Text as T
import Freckle.App.Async (async)
import qualified Freckle.App.Env as Env
import Freckle.App.OpenTelemetry
import Kafka.Producer
import qualified OpenTelemetry.Trace as Trace
import Yesod.Core.Lens
import Yesod.Core.Types (HandlerData)

envKafkaBrokerAddresses
  :: Env.Parser Env.Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses :: Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses =
  Reader Error (NonEmpty BrokerAddress)
-> String
-> Mod Var (NonEmpty BrokerAddress)
-> Parser Error (NonEmpty BrokerAddress)
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    ((String -> Either String (NonEmpty BrokerAddress))
-> Reader Error (NonEmpty BrokerAddress)
forall a. (String -> Either String a) -> Reader Error a
Env.eitherReader String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses)
    String
"KAFKA_BROKER_ADDRESSES"
    Mod Var (NonEmpty BrokerAddress)
forall a. Monoid a => a
mempty

readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses String
t = case [Text] -> Maybe (NonEmpty Text)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty ([Text] -> Maybe (NonEmpty Text))
-> [Text] -> Maybe (NonEmpty Text)
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"," (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
t of
  Just xs :: NonEmpty Text
xs@(Text
x NE.:| [Text]
_)
    | Text
x Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text
"" -> NonEmpty BrokerAddress -> Either String (NonEmpty BrokerAddress)
forall a b. b -> Either a b
Right (NonEmpty BrokerAddress -> Either String (NonEmpty BrokerAddress))
-> NonEmpty BrokerAddress -> Either String (NonEmpty BrokerAddress)
forall a b. (a -> b) -> a -> b
$ Text -> BrokerAddress
BrokerAddress (Text -> BrokerAddress) -> NonEmpty Text -> NonEmpty BrokerAddress
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty Text
xs
  Maybe (NonEmpty Text)
_ -> String -> Either String (NonEmpty BrokerAddress)
forall a b. a -> Either a b
Left String
"Broker Address cannot be empty"

data KafkaProducerPoolConfig = KafkaProducerPoolConfig
  { KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigStripes :: Int
  -- ^ The number of stripes (distinct sub-pools) to maintain.
  -- The smallest acceptable value is 1.
  , KafkaProducerPoolConfig -> NominalDiffTime
kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime
  -- ^ Amount of time for which an unused resource is kept open.
  -- The smallest acceptable value is 0.5 seconds.
  --
  -- The elapsed time before destroying a resource may be a little
  -- longer than requested, as the reaper thread wakes at 1-second
  -- intervals.
  , KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigSize :: Int
  -- ^ Maximum number of resources to keep open per stripe.  The
  -- smallest acceptable value is 1.
  --
  -- Requests for resources will block if this limit is reached on a
  -- single stripe, even if other stripes have idle resources
  -- available.
  }
  deriving stock (Int -> KafkaProducerPoolConfig -> ShowS
[KafkaProducerPoolConfig] -> ShowS
KafkaProducerPoolConfig -> String
(Int -> KafkaProducerPoolConfig -> ShowS)
-> (KafkaProducerPoolConfig -> String)
-> ([KafkaProducerPoolConfig] -> ShowS)
-> Show KafkaProducerPoolConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaProducerPoolConfig -> ShowS
showsPrec :: Int -> KafkaProducerPoolConfig -> ShowS
$cshow :: KafkaProducerPoolConfig -> String
show :: KafkaProducerPoolConfig -> String
$cshowList :: [KafkaProducerPoolConfig] -> ShowS
showList :: [KafkaProducerPoolConfig] -> ShowS
Show)

-- | Same defaults as 'Database.Persist.Sql.ConnectionPoolConfig'
defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig = Int -> NominalDiffTime -> Int -> KafkaProducerPoolConfig
KafkaProducerPoolConfig Int
1 NominalDiffTime
600 Int
10

envKafkaProducerPoolConfig
  :: Env.Parser Env.Error KafkaProducerPoolConfig
envKafkaProducerPoolConfig :: Parser Error KafkaProducerPoolConfig
envKafkaProducerPoolConfig = do
  Int
poolSize <- Reader Error Int -> String -> Mod Var Int -> Parser Error Int
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Int
forall e a. (AsUnread e, Read a) => Reader e a
Env.auto String
"KAFKA_PRODUCER_POOL_SIZE" (Mod Var Int -> Parser Error Int)
-> Mod Var Int -> Parser Error Int
forall a b. (a -> b) -> a -> b
$ Int -> Mod Var Int
forall a. a -> Mod Var a
Env.def Int
10
  pure $ KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig {kafkaProducerPoolConfigSize :: Int
kafkaProducerPoolConfigSize = Int
poolSize}

data KafkaProducerPool
  = NullKafkaProducerPool
  | KafkaProducerPool (Pool KafkaProducer)

class HasKafkaProducerPool env where
  kafkaProducerPoolL :: Lens' env KafkaProducerPool

instance HasKafkaProducerPool site => HasKafkaProducerPool (HandlerData child site) where
  kafkaProducerPoolL :: Lens' (HandlerData child site) KafkaProducerPool
kafkaProducerPoolL = (RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> HandlerData child site -> f (HandlerData child site)
forall child site (f :: * -> *).
Functor f =>
(RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> HandlerData child site -> f (HandlerData child site)
envL ((RunHandlerEnv child site -> f (RunHandlerEnv child site))
 -> HandlerData child site -> f (HandlerData child site))
-> ((KafkaProducerPool -> f KafkaProducerPool)
    -> RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> (KafkaProducerPool -> f KafkaProducerPool)
-> HandlerData child site
-> f (HandlerData child site)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (site -> f site)
-> RunHandlerEnv child site -> f (RunHandlerEnv child site)
forall child site (f :: * -> *).
Functor f =>
(site -> f site)
-> RunHandlerEnv child site -> f (RunHandlerEnv child site)
siteL ((site -> f site)
 -> RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> ((KafkaProducerPool -> f KafkaProducerPool) -> site -> f site)
-> (KafkaProducerPool -> f KafkaProducerPool)
-> RunHandlerEnv child site
-> f (RunHandlerEnv child site)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (KafkaProducerPool -> f KafkaProducerPool) -> site -> f site
forall env. HasKafkaProducerPool env => Lens' env KafkaProducerPool
Lens' site KafkaProducerPool
kafkaProducerPoolL

createKafkaProducerPool
  :: NonEmpty BrokerAddress
  -> KafkaProducerPoolConfig
  -> IO (Pool KafkaProducer)
createKafkaProducerPool :: NonEmpty BrokerAddress
-> KafkaProducerPoolConfig -> IO (Pool KafkaProducer)
createKafkaProducerPool NonEmpty BrokerAddress
addresses KafkaProducerPoolConfig {Int
NominalDiffTime
kafkaProducerPoolConfigStripes :: KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigIdleTimeout :: KafkaProducerPoolConfig -> NominalDiffTime
kafkaProducerPoolConfigSize :: KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigStripes :: Int
kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime
kafkaProducerPoolConfigSize :: Int
..} =
  PoolConfig KafkaProducer -> IO (Pool KafkaProducer)
forall a. PoolConfig a -> IO (Pool a)
Pool.newPool (PoolConfig KafkaProducer -> IO (Pool KafkaProducer))
-> PoolConfig KafkaProducer -> IO (Pool KafkaProducer)
forall a b. (a -> b) -> a -> b
$
    Maybe Int -> PoolConfig KafkaProducer -> PoolConfig KafkaProducer
forall a. Maybe Int -> PoolConfig a -> PoolConfig a
Pool.setNumStripes (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
kafkaProducerPoolConfigStripes) (PoolConfig KafkaProducer -> PoolConfig KafkaProducer)
-> PoolConfig KafkaProducer -> PoolConfig KafkaProducer
forall a b. (a -> b) -> a -> b
$
      IO KafkaProducer
-> (KafkaProducer -> IO ())
-> Double
-> Int
-> PoolConfig KafkaProducer
forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig
        IO KafkaProducer
mkProducer
        KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer
        (NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac NominalDiffTime
kafkaProducerPoolConfigIdleTimeout)
        Int
kafkaProducerPoolConfigSize
 where
  mkProducer :: IO KafkaProducer
mkProducer =
    (KafkaError -> IO KafkaProducer)
-> (KafkaProducer -> IO KafkaProducer)
-> Either KafkaError KafkaProducer
-> IO KafkaProducer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
      (\KafkaError
err -> String -> IO KafkaProducer
forall (m :: * -> *) a. (MonadIO m, HasCallStack) => String -> m a
throwString (String
"Failed to open kafka producer: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> KafkaError -> String
forall a. Show a => a -> String
show KafkaError
err))
      KafkaProducer -> IO KafkaProducer
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
      (Either KafkaError KafkaProducer -> IO KafkaProducer)
-> IO (Either KafkaError KafkaProducer) -> IO KafkaProducer
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer ([BrokerAddress] -> ProducerProperties
brokersList ([BrokerAddress] -> ProducerProperties)
-> [BrokerAddress] -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ NonEmpty BrokerAddress -> [BrokerAddress]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty BrokerAddress
addresses)

produceKeyedOn
  :: ( MonadUnliftIO m
     , MonadLogger m
     , MonadTracer m
     , MonadReader env m
     , HasKafkaProducerPool env
     , ToJSON key
     , ToJSON value
     )
  => TopicName
  -> NonEmpty value
  -> (value -> key)
  -> m ()
produceKeyedOn :: forall (m :: * -> *) env key value.
(MonadUnliftIO m, MonadLogger m, MonadTracer m, MonadReader env m,
 HasKafkaProducerPool env, ToJSON key, ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOn TopicName
prTopic NonEmpty value
values value -> key
keyF = m () -> m ()
traced (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logDebugNS Text
"kafka" (Message -> m ()) -> Message -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Producing Kafka events" Text -> [SeriesElem] -> Message
:# [Key
"events" Key -> NonEmpty value -> SeriesElem
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> SeriesElem
.= NonEmpty value
values]
  Getting KafkaProducerPool env KafkaProducerPool
-> m KafkaProducerPool
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaProducerPool env KafkaProducerPool
forall env. HasKafkaProducerPool env => Lens' env KafkaProducerPool
Lens' env KafkaProducerPool
kafkaProducerPoolL m KafkaProducerPool -> (KafkaProducerPool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    KafkaProducerPool
NullKafkaProducerPool -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    KafkaProducerPool Pool KafkaProducer
producerPool -> do
      [(ProducerRecord, KafkaError)]
errors <-
        IO [(ProducerRecord, KafkaError)]
-> m [(ProducerRecord, KafkaError)]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [(ProducerRecord, KafkaError)]
 -> m [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
-> m [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$
          Pool KafkaProducer
-> (KafkaProducer -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool KafkaProducer
producerPool ((KafkaProducer -> IO [(ProducerRecord, KafkaError)])
 -> IO [(ProducerRecord, KafkaError)])
-> (KafkaProducer -> IO [(ProducerRecord, KafkaError)])
-> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$ \KafkaProducer
producer ->
            KafkaProducer
-> [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> [ProducerRecord] -> m [(ProducerRecord, KafkaError)]
produceMessageBatch KafkaProducer
producer ([ProducerRecord] -> IO [(ProducerRecord, KafkaError)])
-> [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
forall a b. (a -> b) -> a -> b
$
              NonEmpty ProducerRecord -> [ProducerRecord]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (NonEmpty ProducerRecord -> [ProducerRecord])
-> NonEmpty ProducerRecord -> [ProducerRecord]
forall a b. (a -> b) -> a -> b
$
                value -> ProducerRecord
mkProducerRecord (value -> ProducerRecord)
-> NonEmpty value -> NonEmpty ProducerRecord
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty value
values
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(ProducerRecord, KafkaError)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ProducerRecord, KafkaError)]
errors) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka" (Message -> m ()) -> Message -> m ()
forall a b. (a -> b) -> a -> b
$
          Text
"Failed to send events" Text -> [SeriesElem] -> Message
:# [Key
"errors" Key -> [Text] -> SeriesElem
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> SeriesElem
.= ((ProducerRecord, KafkaError) -> Text)
-> [(ProducerRecord, KafkaError)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (KafkaError -> Text
forall a. Show a => a -> Text
tshow (KafkaError -> Text)
-> ((ProducerRecord, KafkaError) -> KafkaError)
-> (ProducerRecord, KafkaError)
-> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ProducerRecord, KafkaError) -> KafkaError
forall a b. (a, b) -> b
snd) [(ProducerRecord, KafkaError)]
errors]
 where
  mkProducerRecord :: value -> ProducerRecord
mkProducerRecord value
value =
    ProducerRecord
      { TopicName
prTopic :: TopicName
prTopic :: TopicName
prTopic
      , prPartition :: ProducePartition
prPartition = ProducePartition
UnassignedPartition
      , prKey :: Maybe ByteString
prKey = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ key -> ByteString
forall a. ToJSON a => a -> ByteString
encode (key -> ByteString) -> key -> ByteString
forall a b. (a -> b) -> a -> b
$ value -> key
keyF value
value
      , prValue :: Maybe ByteString
prValue =
          ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$
            ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$
              value -> ByteString
forall a. ToJSON a => a -> ByteString
encode value
value
      }

  traced :: m () -> m ()
traced =
    Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan
      Text
"kafka.produce"
      SpanArguments
producerSpanArguments
        { attributes :: HashMap Text Attribute
Trace.attributes =
            [(Text, Attribute)] -> HashMap Text Attribute
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
              [(Text
"topic", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
Trace.toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ TopicName -> Text
unTopicName TopicName
prTopic)]
        }

produceKeyedOnAsync
  :: ( MonadMask m
     , MonadUnliftIO m
     , MonadLogger m
     , MonadTracer m
     , MonadReader env m
     , HasKafkaProducerPool env
     , ToJSON key
     , ToJSON value
     )
  => TopicName
  -> NonEmpty value
  -> (value -> key)
  -> m ()
produceKeyedOnAsync :: forall (m :: * -> *) env key value.
(MonadMask m, MonadUnliftIO m, MonadLogger m, MonadTracer m,
 MonadReader env m, HasKafkaProducerPool env, ToJSON key,
 ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOnAsync TopicName
prTopic NonEmpty value
values = m (Async ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Async ()) -> m ())
-> ((value -> key) -> m (Async ())) -> (value -> key) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Async ())
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
m a -> m (Async a)
async (m () -> m (Async ()))
-> ((value -> key) -> m ()) -> (value -> key) -> m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TopicName -> NonEmpty value -> (value -> key) -> m ()
forall (m :: * -> *) env key value.
(MonadUnliftIO m, MonadLogger m, MonadTracer m, MonadReader env m,
 HasKafkaProducerPool env, ToJSON key, ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOn TopicName
prTopic NonEmpty value
values