{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE NamedFieldPuns #-}

module Freckle.App.Kafka.Producer
  ( envKafkaBrokerAddresses
  , KafkaProducerPoolConfig (..)
  , envKafkaProducerPoolConfig
  , KafkaProducerPool (..)
  , HasKafkaProducerPool (..)
  , createKafkaProducerPool
  , produceKeyedOn
  , produceKeyedOnAsync
  ) where

import Freckle.App.Prelude

import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson (ToJSON, encode)
import Data.ByteString.Lazy (toStrict)
import qualified Data.List.NonEmpty as NE
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import qualified Data.Text as T
import Freckle.App.Async (async)
import qualified Freckle.App.Env as Env
import Kafka.Producer
import UnliftIO.Exception (throwString)
import Yesod.Core.Lens
import Yesod.Core.Types (HandlerData)

envKafkaBrokerAddresses
  :: Env.Parser Env.Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses :: Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses =
  forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
    (forall a. (String -> Either String a) -> Reader Error a
Env.eitherReader String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses)
    String
"KAFKA_BROKER_ADDRESSES"
    forall a. Monoid a => a
mempty

readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses String
t = case forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty forall a b. (a -> b) -> a -> b
$ Text -> Text -> [Text]
T.splitOn Text
"," forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
t of
  Just xs :: NonEmpty Text
xs@(Text
x NE.:| [Text]
_)
    | Text
x forall a. Eq a => a -> a -> Bool
/= Text
"" -> forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ Text -> BrokerAddress
BrokerAddress forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty Text
xs
  Maybe (NonEmpty Text)
_ -> forall a b. a -> Either a b
Left String
"Broker Address cannot be empty"

data KafkaProducerPoolConfig = KafkaProducerPoolConfig
  { KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigStripes :: Int
  -- ^ The number of stripes (distinct sub-pools) to maintain.
  -- The smallest acceptable value is 1.
  , KafkaProducerPoolConfig -> NominalDiffTime
kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime
  -- ^ Amount of time for which an unused resource is kept open.
  -- The smallest acceptable value is 0.5 seconds.
  --
  -- The elapsed time before destroying a resource may be a little
  -- longer than requested, as the reaper thread wakes at 1-second
  -- intervals.
  , KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigSize :: Int
  -- ^ Maximum number of resources to keep open per stripe.  The
  -- smallest acceptable value is 1.
  --
  -- Requests for resources will block if this limit is reached on a
  -- single stripe, even if other stripes have idle resources
  -- available.
  }
  deriving stock (Int -> KafkaProducerPoolConfig -> ShowS
[KafkaProducerPoolConfig] -> ShowS
KafkaProducerPoolConfig -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaProducerPoolConfig] -> ShowS
$cshowList :: [KafkaProducerPoolConfig] -> ShowS
show :: KafkaProducerPoolConfig -> String
$cshow :: KafkaProducerPoolConfig -> String
showsPrec :: Int -> KafkaProducerPoolConfig -> ShowS
$cshowsPrec :: Int -> KafkaProducerPoolConfig -> ShowS
Show)

-- | Same defaults as 'Database.Persist.Sql.ConnectionPoolConfig'
defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig = Int -> NominalDiffTime -> Int -> KafkaProducerPoolConfig
KafkaProducerPoolConfig Int
1 NominalDiffTime
600 Int
10

envKafkaProducerPoolConfig
  :: Env.Parser Env.Error KafkaProducerPoolConfig
envKafkaProducerPoolConfig :: Parser Error KafkaProducerPoolConfig
envKafkaProducerPoolConfig = do
  Int
poolSize <- forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var forall e a. (AsUnread e, Read a) => Reader e a
Env.auto String
"KAFKA_PRODUCER_POOL_SIZE" forall a b. (a -> b) -> a -> b
$ forall a. a -> Mod Var a
Env.def Int
10
  pure $ KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig {kafkaProducerPoolConfigSize :: Int
kafkaProducerPoolConfigSize = Int
poolSize}

data KafkaProducerPool
  = NullKafkaProducerPool
  | KafkaProducerPool (Pool KafkaProducer)

class HasKafkaProducerPool env where
  kafkaProducerPoolL :: Lens' env KafkaProducerPool

instance HasKafkaProducerPool site => HasKafkaProducerPool (HandlerData child site) where
  kafkaProducerPoolL :: Lens' (HandlerData child site) KafkaProducerPool
kafkaProducerPoolL = forall child site.
Lens' (HandlerData child site) (RunHandlerEnv child site)
envL forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall child site. Lens' (RunHandlerEnv child site) site
siteL forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall env. HasKafkaProducerPool env => Lens' env KafkaProducerPool
kafkaProducerPoolL

createKafkaProducerPool
  :: NonEmpty BrokerAddress
  -> KafkaProducerPoolConfig
  -> IO (Pool KafkaProducer)
createKafkaProducerPool :: NonEmpty BrokerAddress
-> KafkaProducerPoolConfig -> IO (Pool KafkaProducer)
createKafkaProducerPool NonEmpty BrokerAddress
addresses KafkaProducerPoolConfig {Int
NominalDiffTime
kafkaProducerPoolConfigSize :: Int
kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime
kafkaProducerPoolConfigStripes :: Int
kafkaProducerPoolConfigSize :: KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigIdleTimeout :: KafkaProducerPoolConfig -> NominalDiffTime
kafkaProducerPoolConfigStripes :: KafkaProducerPoolConfig -> Int
..} =
  forall a. PoolConfig a -> IO (Pool a)
Pool.newPool forall a b. (a -> b) -> a -> b
$
    forall a. Maybe Int -> PoolConfig a -> PoolConfig a
Pool.setNumStripes (forall a. a -> Maybe a
Just Int
kafkaProducerPoolConfigStripes) forall a b. (a -> b) -> a -> b
$
      forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig
        IO KafkaProducer
mkProducer
        forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer
        (forall a b. (Real a, Fractional b) => a -> b
realToFrac NominalDiffTime
kafkaProducerPoolConfigIdleTimeout)
        Int
kafkaProducerPoolConfigSize
 where
  mkProducer :: IO KafkaProducer
mkProducer =
    forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall {m :: * -> *} {a} {a}. (MonadIO m, Show a) => a -> m a
throw forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer ([BrokerAddress] -> ProducerProperties
brokersList forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty BrokerAddress
addresses)
  throw :: a -> m a
throw a
err = forall (m :: * -> *) a. (MonadIO m, HasCallStack) => String -> m a
throwString forall a b. (a -> b) -> a -> b
$ String
"Failed to open kafka producer: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show a
err

produceKeyedOn
  :: ( MonadUnliftIO m
     , MonadLogger m
     , MonadReader env m
     , HasKafkaProducerPool env
     , ToJSON key
     , ToJSON value
     )
  => TopicName
  -> NonEmpty value
  -> (value -> key)
  -> m ()
produceKeyedOn :: forall (m :: * -> *) env key value.
(MonadUnliftIO m, MonadLogger m, MonadReader env m,
 HasKafkaProducerPool env, ToJSON key, ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOn TopicName
prTopic NonEmpty value
values value -> key
keyF = do
  forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logDebugNS Text
"kafka" forall a b. (a -> b) -> a -> b
$ Text
"Producing Kafka events" Text -> [SeriesElem] -> Message
:# [Key
"events" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= NonEmpty value
values]
  forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view forall env. HasKafkaProducerPool env => Lens' env KafkaProducerPool
kafkaProducerPoolL forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    KafkaProducerPool
NullKafkaProducerPool -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    KafkaProducerPool Pool KafkaProducer
producerPool -> do
      [(ProducerRecord, KafkaError)]
errors <-
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
          forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool KafkaProducer
producerPool forall a b. (a -> b) -> a -> b
$ \KafkaProducer
producer ->
            forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> [ProducerRecord] -> m [(ProducerRecord, KafkaError)]
produceMessageBatch KafkaProducer
producer forall a b. (a -> b) -> a -> b
$
              forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall a b. (a -> b) -> a -> b
$
                value -> ProducerRecord
mkProducerRecord forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty value
values
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ProducerRecord, KafkaError)]
errors) forall a b. (a -> b) -> a -> b
$
        forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka" forall a b. (a -> b) -> a -> b
$
          Text
"Failed to send events" Text -> [SeriesElem] -> Message
:# [Key
"errors" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Show a => a -> Text
tshow forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd) [(ProducerRecord, KafkaError)]
errors]
 where
  mkProducerRecord :: value -> ProducerRecord
mkProducerRecord value
value =
    ProducerRecord
      { TopicName
prTopic :: TopicName
prTopic :: TopicName
prTopic
      , prPartition :: ProducePartition
prPartition = ProducePartition
UnassignedPartition
      , prKey :: Maybe ByteString
prKey = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
toStrict forall a b. (a -> b) -> a -> b
$ forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ value -> key
keyF value
value
      , prValue :: Maybe ByteString
prValue =
          forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$
            ByteString -> ByteString
toStrict forall a b. (a -> b) -> a -> b
$
              forall a. ToJSON a => a -> ByteString
encode value
value
      }

produceKeyedOnAsync
  :: ( MonadMask m
     , MonadUnliftIO m
     , MonadLogger m
     , MonadReader env m
     , HasKafkaProducerPool env
     , ToJSON key
     , ToJSON value
     )
  => TopicName
  -> NonEmpty value
  -> (value -> key)
  -> m ()
produceKeyedOnAsync :: forall (m :: * -> *) env key value.
(MonadMask m, MonadUnliftIO m, MonadLogger m, MonadReader env m,
 HasKafkaProducerPool env, ToJSON key, ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOnAsync TopicName
prTopic NonEmpty value
values = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
m a -> m (Async a)
async forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) env key value.
(MonadUnliftIO m, MonadLogger m, MonadReader env m,
 HasKafkaProducerPool env, ToJSON key, ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOn TopicName
prTopic NonEmpty value
values