{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE NamedFieldPuns #-}
module Freckle.App.Kafka.Producer
( envKafkaBrokerAddresses
, KafkaProducerPoolConfig (..)
, envKafkaProducerPoolConfig
, KafkaProducerPool (..)
, HasKafkaProducerPool (..)
, createKafkaProducerPool
, produceKeyedOn
, produceKeyedOnAsync
) where
import Freckle.App.Prelude
import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson (ToJSON, encode)
import Data.ByteString.Lazy (toStrict)
import qualified Data.HashMap.Strict as HashMap
import qualified Data.List.NonEmpty as NE
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import qualified Data.Text as T
import Freckle.App.Async (async)
import qualified Freckle.App.Env as Env
import Freckle.App.OpenTelemetry
import Kafka.Producer
import qualified OpenTelemetry.Trace as Trace
import UnliftIO (withRunInIO)
import Yesod.Core.Lens
import Yesod.Core.Types (HandlerData)
envKafkaBrokerAddresses
:: Env.Parser Env.Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses :: Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses =
Reader Error (NonEmpty BrokerAddress)
-> String
-> Mod Var (NonEmpty BrokerAddress)
-> Parser Error (NonEmpty BrokerAddress)
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
((String -> Either String (NonEmpty BrokerAddress))
-> Reader Error (NonEmpty BrokerAddress)
forall a. (String -> Either String a) -> Reader Error a
Env.eitherReader String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses)
String
"KAFKA_BROKER_ADDRESSES"
Mod Var (NonEmpty BrokerAddress)
forall a. Monoid a => a
mempty
readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses String
t = case [Text] -> Maybe (NonEmpty Text)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty ([Text] -> Maybe (NonEmpty Text))
-> [Text] -> Maybe (NonEmpty Text)
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"," (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
t of
Just xs :: NonEmpty Text
xs@(Text
x NE.:| [Text]
_)
| Text
x Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text
"" -> NonEmpty BrokerAddress -> Either String (NonEmpty BrokerAddress)
forall a b. b -> Either a b
Right (NonEmpty BrokerAddress -> Either String (NonEmpty BrokerAddress))
-> NonEmpty BrokerAddress -> Either String (NonEmpty BrokerAddress)
forall a b. (a -> b) -> a -> b
$ Text -> BrokerAddress
BrokerAddress (Text -> BrokerAddress) -> NonEmpty Text -> NonEmpty BrokerAddress
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty Text
xs
Maybe (NonEmpty Text)
_ -> String -> Either String (NonEmpty BrokerAddress)
forall a b. a -> Either a b
Left String
"Broker Address cannot be empty"
data KafkaProducerPoolConfig = KafkaProducerPoolConfig
{ KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigStripes :: Int
, KafkaProducerPoolConfig -> NominalDiffTime
kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime
, KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigSize :: Int
}
deriving stock (Int -> KafkaProducerPoolConfig -> ShowS
[KafkaProducerPoolConfig] -> ShowS
KafkaProducerPoolConfig -> String
(Int -> KafkaProducerPoolConfig -> ShowS)
-> (KafkaProducerPoolConfig -> String)
-> ([KafkaProducerPoolConfig] -> ShowS)
-> Show KafkaProducerPoolConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaProducerPoolConfig -> ShowS
showsPrec :: Int -> KafkaProducerPoolConfig -> ShowS
$cshow :: KafkaProducerPoolConfig -> String
show :: KafkaProducerPoolConfig -> String
$cshowList :: [KafkaProducerPoolConfig] -> ShowS
showList :: [KafkaProducerPoolConfig] -> ShowS
Show)
defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig = Int -> NominalDiffTime -> Int -> KafkaProducerPoolConfig
KafkaProducerPoolConfig Int
1 NominalDiffTime
600 Int
10
envKafkaProducerPoolConfig
:: Env.Parser Env.Error KafkaProducerPoolConfig
envKafkaProducerPoolConfig :: Parser Error KafkaProducerPoolConfig
envKafkaProducerPoolConfig = do
Int
poolSize <- Reader Error Int -> String -> Mod Var Int -> Parser Error Int
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Int
forall e a. (AsUnread e, Read a) => Reader e a
Env.auto String
"KAFKA_PRODUCER_POOL_SIZE" (Mod Var Int -> Parser Error Int)
-> Mod Var Int -> Parser Error Int
forall a b. (a -> b) -> a -> b
$ Int -> Mod Var Int
forall a. a -> Mod Var a
Env.def Int
10
pure $ KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig {kafkaProducerPoolConfigSize = poolSize}
data KafkaProducerPool
= NullKafkaProducerPool
| KafkaProducerPool (Pool KafkaProducer)
class HasKafkaProducerPool env where
kafkaProducerPoolL :: Lens' env KafkaProducerPool
instance HasKafkaProducerPool site => HasKafkaProducerPool (HandlerData child site) where
kafkaProducerPoolL :: Lens' (HandlerData child site) KafkaProducerPool
kafkaProducerPoolL = (RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> HandlerData child site -> f (HandlerData child site)
forall child site (f :: * -> *).
Functor f =>
(RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> HandlerData child site -> f (HandlerData child site)
envL ((RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> HandlerData child site -> f (HandlerData child site))
-> ((KafkaProducerPool -> f KafkaProducerPool)
-> RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> (KafkaProducerPool -> f KafkaProducerPool)
-> HandlerData child site
-> f (HandlerData child site)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (site -> f site)
-> RunHandlerEnv child site -> f (RunHandlerEnv child site)
forall child site (f :: * -> *).
Functor f =>
(site -> f site)
-> RunHandlerEnv child site -> f (RunHandlerEnv child site)
siteL ((site -> f site)
-> RunHandlerEnv child site -> f (RunHandlerEnv child site))
-> ((KafkaProducerPool -> f KafkaProducerPool) -> site -> f site)
-> (KafkaProducerPool -> f KafkaProducerPool)
-> RunHandlerEnv child site
-> f (RunHandlerEnv child site)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (KafkaProducerPool -> f KafkaProducerPool) -> site -> f site
forall env. HasKafkaProducerPool env => Lens' env KafkaProducerPool
Lens' site KafkaProducerPool
kafkaProducerPoolL
createKafkaProducerPool
:: NonEmpty BrokerAddress
-> KafkaProducerPoolConfig
-> IO (Pool KafkaProducer)
createKafkaProducerPool :: NonEmpty BrokerAddress
-> KafkaProducerPoolConfig -> IO (Pool KafkaProducer)
createKafkaProducerPool NonEmpty BrokerAddress
addresses KafkaProducerPoolConfig {Int
NominalDiffTime
kafkaProducerPoolConfigStripes :: KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigIdleTimeout :: KafkaProducerPoolConfig -> NominalDiffTime
kafkaProducerPoolConfigSize :: KafkaProducerPoolConfig -> Int
kafkaProducerPoolConfigStripes :: Int
kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime
kafkaProducerPoolConfigSize :: Int
..} =
PoolConfig KafkaProducer -> IO (Pool KafkaProducer)
forall a. PoolConfig a -> IO (Pool a)
Pool.newPool
(PoolConfig KafkaProducer -> IO (Pool KafkaProducer))
-> PoolConfig KafkaProducer -> IO (Pool KafkaProducer)
forall a b. (a -> b) -> a -> b
$ Maybe Int -> PoolConfig KafkaProducer -> PoolConfig KafkaProducer
forall a. Maybe Int -> PoolConfig a -> PoolConfig a
Pool.setNumStripes (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
kafkaProducerPoolConfigStripes)
(PoolConfig KafkaProducer -> PoolConfig KafkaProducer)
-> PoolConfig KafkaProducer -> PoolConfig KafkaProducer
forall a b. (a -> b) -> a -> b
$ IO KafkaProducer
-> (KafkaProducer -> IO ())
-> Double
-> Int
-> PoolConfig KafkaProducer
forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig
IO KafkaProducer
mkProducer
KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer
(NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac NominalDiffTime
kafkaProducerPoolConfigIdleTimeout)
Int
kafkaProducerPoolConfigSize
where
mkProducer :: IO KafkaProducer
mkProducer =
(KafkaError -> IO KafkaProducer)
-> (KafkaProducer -> IO KafkaProducer)
-> Either KafkaError KafkaProducer
-> IO KafkaProducer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
(\KafkaError
err -> String -> IO KafkaProducer
forall (m :: * -> *) a. (MonadIO m, HasCallStack) => String -> m a
throwString (String
"Failed to open kafka producer: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> KafkaError -> String
forall a. Show a => a -> String
show KafkaError
err))
KafkaProducer -> IO KafkaProducer
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(Either KafkaError KafkaProducer -> IO KafkaProducer)
-> IO (Either KafkaError KafkaProducer) -> IO KafkaProducer
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer ([BrokerAddress] -> ProducerProperties
brokersList ([BrokerAddress] -> ProducerProperties)
-> [BrokerAddress] -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ NonEmpty BrokerAddress -> [BrokerAddress]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty BrokerAddress
addresses)
produceKeyedOn
:: ( MonadUnliftIO m
, MonadLogger m
, MonadTracer m
, MonadReader env m
, HasKafkaProducerPool env
, ToJSON key
, ToJSON value
)
=> TopicName
-> NonEmpty value
-> (value -> key)
-> m ()
produceKeyedOn :: forall (m :: * -> *) env key value.
(MonadUnliftIO m, MonadLogger m, MonadTracer m, MonadReader env m,
HasKafkaProducerPool env, ToJSON key, ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOn TopicName
prTopic NonEmpty value
values value -> key
keyF = m () -> m ()
traced (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logDebugNS Text
"kafka" (Message -> m ()) -> Message -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Producing Kafka events" Text -> [SeriesElem] -> Message
:# [Key
"events" Key -> NonEmpty value -> SeriesElem
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> SeriesElem
.= NonEmpty value
values]
Getting KafkaProducerPool env KafkaProducerPool
-> m KafkaProducerPool
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaProducerPool env KafkaProducerPool
forall env. HasKafkaProducerPool env => Lens' env KafkaProducerPool
Lens' env KafkaProducerPool
kafkaProducerPoolL m KafkaProducerPool -> (KafkaProducerPool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
KafkaProducerPool
NullKafkaProducerPool -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
KafkaProducerPool Pool KafkaProducer
producerPool ->
((forall a. m a -> IO a) -> IO ()) -> m ()
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
Pool KafkaProducer -> (KafkaProducer -> IO ()) -> IO ()
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool KafkaProducer
producerPool ((KafkaProducer -> IO ()) -> IO ())
-> (KafkaProducer -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \KafkaProducer
producer ->
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ @NonEmpty NonEmpty value
values ((value -> IO ()) -> IO ()) -> (value -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \value
value -> do
Maybe KafkaError
mError <- IO (Maybe KafkaError) -> IO (Maybe KafkaError)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError) -> IO (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ KafkaProducer -> ProducerRecord -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
produceMessage KafkaProducer
producer (ProducerRecord -> IO (Maybe KafkaError))
-> ProducerRecord -> IO (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ value -> ProducerRecord
mkProducerRecord value
value
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ @Maybe Maybe KafkaError
mError ((KafkaError -> IO ()) -> IO ()) -> (KafkaError -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \KafkaError
e ->
m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka" (Message -> m ()) -> Message -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Failed to send event" Text -> [SeriesElem] -> Message
:# [Key
"error" Key -> Text -> SeriesElem
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
forall v. ToJSON v => Key -> v -> SeriesElem
.= KafkaError -> Text
forall a. Show a => a -> Text
tshow KafkaError
e]
where
mkProducerRecord :: value -> ProducerRecord
mkProducerRecord value
value =
ProducerRecord
{ TopicName
prTopic :: TopicName
prTopic :: TopicName
prTopic
, prPartition :: ProducePartition
prPartition = ProducePartition
UnassignedPartition
, prKey :: Maybe ByteString
prKey = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ key -> ByteString
forall a. ToJSON a => a -> ByteString
encode (key -> ByteString) -> key -> ByteString
forall a b. (a -> b) -> a -> b
$ value -> key
keyF value
value
, prValue :: Maybe ByteString
prValue = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ value -> ByteString
forall a. ToJSON a => a -> ByteString
encode value
value
, prHeaders :: Headers
prHeaders = Headers
forall a. Monoid a => a
mempty
}
traced :: m () -> m ()
traced =
Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan
Text
"kafka.produce"
SpanArguments
producerSpanArguments
{ Trace.attributes =
HashMap.fromList
[ ("service.name", "kafka")
, ("topic", Trace.toAttribute $ unTopicName prTopic)
]
}
produceKeyedOnAsync
:: ( MonadMask m
, MonadUnliftIO m
, MonadLogger m
, MonadTracer m
, MonadReader env m
, HasKafkaProducerPool env
, ToJSON key
, ToJSON value
)
=> TopicName
-> NonEmpty value
-> (value -> key)
-> m ()
produceKeyedOnAsync :: forall (m :: * -> *) env key value.
(MonadMask m, MonadUnliftIO m, MonadLogger m, MonadTracer m,
MonadReader env m, HasKafkaProducerPool env, ToJSON key,
ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOnAsync TopicName
prTopic NonEmpty value
values = m (Async ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Async ()) -> m ())
-> ((value -> key) -> m (Async ())) -> (value -> key) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Async ())
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
m a -> m (Async a)
async (m () -> m (Async ()))
-> ((value -> key) -> m ()) -> (value -> key) -> m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TopicName -> NonEmpty value -> (value -> key) -> m ()
forall (m :: * -> *) env key value.
(MonadUnliftIO m, MonadLogger m, MonadTracer m, MonadReader env m,
HasKafkaProducerPool env, ToJSON key, ToJSON value) =>
TopicName -> NonEmpty value -> (value -> key) -> m ()
produceKeyedOn TopicName
prTopic NonEmpty value
values