{-# LANGUAGE RankNTypes #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
module Kafka
(
Internal.Handler,
Settings.Settings,
Settings.decoder,
handler,
Internal.Msg,
emptyMsg,
addPayload,
addKey,
Internal.sendAsync,
Internal.sendSync,
topic,
payload,
key,
)
where
import qualified Conduit
import qualified Control.Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TMVar as TMVar
import qualified Control.Exception.Safe as Exception
import Control.Monad.IO.Class (liftIO)
import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Lazy as ByteString.Lazy
import qualified Data.Text.Encoding
import qualified Dict
import qualified Kafka.Internal as Internal
import qualified Kafka.Producer as Producer
import qualified Kafka.Settings as Settings
import qualified Platform
import qualified Prelude
data Details = Details
{ Details -> List Text
detailsBrokers :: List Text,
Details -> Msg
detailsMsg :: Internal.Msg
}
deriving ((forall x. Details -> Rep Details x)
-> (forall x. Rep Details x -> Details) -> Generic Details
forall x. Rep Details x -> Details
forall x. Details -> Rep Details x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Details x -> Details
$cfrom :: forall x. Details -> Rep Details x
Generic, Int -> Details -> ShowS
[Details] -> ShowS
Details -> String
(Int -> Details -> ShowS)
-> (Details -> String) -> ([Details] -> ShowS) -> Show Details
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Details] -> ShowS
$cshowList :: [Details] -> ShowS
show :: Details -> String
$cshow :: Details -> String
showsPrec :: Int -> Details -> ShowS
$cshowsPrec :: Int -> Details -> ShowS
Show)
instance Aeson.ToJSON Details
instance Platform.TracingSpanDetails Details
newtype DeliveryReportDetails = DeliveryReportDetails
{ DeliveryReportDetails -> Text
deliveryReportProducerRecord :: Text
}
deriving ((forall x. DeliveryReportDetails -> Rep DeliveryReportDetails x)
-> (forall x. Rep DeliveryReportDetails x -> DeliveryReportDetails)
-> Generic DeliveryReportDetails
forall x. Rep DeliveryReportDetails x -> DeliveryReportDetails
forall x. DeliveryReportDetails -> Rep DeliveryReportDetails x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep DeliveryReportDetails x -> DeliveryReportDetails
$cfrom :: forall x. DeliveryReportDetails -> Rep DeliveryReportDetails x
Generic, Int -> DeliveryReportDetails -> ShowS
[DeliveryReportDetails] -> ShowS
DeliveryReportDetails -> String
(Int -> DeliveryReportDetails -> ShowS)
-> (DeliveryReportDetails -> String)
-> ([DeliveryReportDetails] -> ShowS)
-> Show DeliveryReportDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DeliveryReportDetails] -> ShowS
$cshowList :: [DeliveryReportDetails] -> ShowS
show :: DeliveryReportDetails -> String
$cshow :: DeliveryReportDetails -> String
showsPrec :: Int -> DeliveryReportDetails -> ShowS
$cshowsPrec :: Int -> DeliveryReportDetails -> ShowS
Show)
instance Aeson.ToJSON DeliveryReportDetails
instance Platform.TracingSpanDetails DeliveryReportDetails
emptyMsg :: Text -> Internal.Msg
emptyMsg :: Text -> Msg
emptyMsg Text
topic' =
Msg :: Topic -> Maybe Key -> Maybe Encodable -> Msg
Internal.Msg
{ topic :: Topic
Internal.topic = Text -> Topic
Internal.Topic Text
topic',
payload :: Maybe Encodable
Internal.payload = Maybe Encodable
forall a. Maybe a
Nothing,
key :: Maybe Key
Internal.key = Maybe Key
forall a. Maybe a
Nothing
}
addPayload :: (Aeson.FromJSON a, Aeson.ToJSON a) => a -> Internal.Msg -> Internal.Msg
addPayload :: a -> Msg -> Msg
addPayload a
contents Msg
msg =
Msg
msg {payload :: Maybe Encodable
Internal.payload = (Encodable -> Maybe Encodable
forall a. a -> Maybe a
Just (a -> Encodable
forall a. (FromJSON a, ToJSON a) => a -> Encodable
Internal.Encodable a
contents))}
addKey :: Text -> Internal.Msg -> Internal.Msg
addKey :: Text -> Msg -> Msg
addKey Text
key' Msg
msg = Msg
msg {key :: Maybe Key
Internal.key = Key -> Maybe Key
forall a. a -> Maybe a
Just (Text -> Key
Internal.Key Text
key')}
record :: Internal.Msg -> Task e Producer.ProducerRecord
record :: Msg -> Task e ProducerRecord
record Msg
msg = do
Text
requestId <- Task e Text
forall e. Task e Text
Platform.requestId
ProducerRecord -> Task e ProducerRecord
forall a x. a -> Task x a
Task.succeed
ProducerRecord :: TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> ProducerRecord
Producer.ProducerRecord
{ prTopic :: TopicName
Producer.prTopic =
Msg -> Topic
Internal.topic Msg
msg
Topic -> (Topic -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> Topic -> Text
Internal.unTopic
Text -> (Text -> TopicName) -> TopicName
forall a b. a -> (a -> b) -> b
|> Text -> TopicName
Producer.TopicName,
prPartition :: ProducePartition
Producer.prPartition = ProducePartition
Producer.UnassignedPartition,
prKey :: Maybe ByteString
Producer.prKey =
(Key -> ByteString) -> Maybe Key -> Maybe ByteString
forall a b. (a -> b) -> Maybe a -> Maybe b
Maybe.map
(Text -> ByteString
Data.Text.Encoding.encodeUtf8 (Text -> ByteString) -> (Key -> Text) -> Key -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< Key -> Text
Internal.unKey)
(Msg -> Maybe Key
Internal.key Msg
msg),
prValue :: Maybe ByteString
Producer.prValue =
(Encodable -> ByteString) -> Maybe Encodable -> Maybe ByteString
forall a b. (a -> b) -> Maybe a -> Maybe b
Maybe.map
( \Encodable
payload' ->
MsgWithMetaData :: MetaData -> Encodable -> MsgWithMetaData
Internal.MsgWithMetaData
{ metaData :: MetaData
Internal.metaData =
MetaData :: Text -> MetaData
Internal.MetaData
{ Text
requestId :: Text
requestId :: Text
Internal.requestId
},
value :: Encodable
Internal.value = Encodable
payload'
}
MsgWithMetaData -> (MsgWithMetaData -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> MsgWithMetaData -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode
ByteString -> (ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> ByteString -> ByteString
ByteString.Lazy.toStrict
)
(Msg -> Maybe Encodable
Internal.payload Msg
msg)
}
topic :: Internal.Msg -> Text
topic :: Msg -> Text
topic Msg
msg = Topic -> Text
Internal.unTopic (Msg -> Topic
Internal.topic Msg
msg)
payload :: (Aeson.FromJSON a) => Internal.Msg -> Maybe a
payload :: Msg -> Maybe a
payload Msg
msg =
Msg -> Maybe Encodable
Internal.payload Msg
msg
Maybe Encodable -> (Maybe Encodable -> Maybe a) -> Maybe a
forall a b. a -> (a -> b) -> b
|> (Encodable -> Maybe a) -> Maybe Encodable -> Maybe a
forall a b. (a -> Maybe b) -> Maybe a -> Maybe b
Maybe.andThen (ByteString -> Maybe a
forall a. FromJSON a => ByteString -> Maybe a
Aeson.decode (ByteString -> Maybe a)
-> (Encodable -> ByteString) -> Encodable -> Maybe a
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< Encodable -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode)
key :: Internal.Msg -> Maybe Text
key :: Msg -> Maybe Text
key Msg
msg = (Key -> Text) -> Maybe Key -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
Maybe.map Key -> Text
Internal.unKey (Msg -> Maybe Key
Internal.key Msg
msg)
handler :: Settings.Settings -> Conduit.Acquire Internal.Handler
handler :: Settings -> Acquire Handler
handler Settings
settings = do
KafkaProducer
producer <- IO KafkaProducer
-> (KafkaProducer -> IO ()) -> Acquire KafkaProducer
forall a. IO a -> (a -> IO ()) -> Acquire a
Conduit.mkAcquire (Settings -> IO KafkaProducer
mkProducer Settings
settings) KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
Producer.closeProducer
TMVar Terminate
_ <- IO (TMVar Terminate)
-> (TMVar Terminate -> IO ()) -> Acquire (TMVar Terminate)
forall a. IO a -> (a -> IO ()) -> Acquire a
Conduit.mkAcquire (KafkaProducer -> IO (TMVar Terminate)
forall b. KafkaProducer -> IO (TMVar b)
startPollEventLoop KafkaProducer
producer) (\TMVar Terminate
terminator -> STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (TMVar Terminate -> Terminate -> STM ()
forall a. TMVar a -> a -> STM ()
TMVar.putTMVar TMVar Terminate
terminator Terminate
Terminate))
IO Handler -> Acquire Handler
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Settings -> KafkaProducer -> IO Handler
mkHandler Settings
settings KafkaProducer
producer)
data Terminate = Terminate
startPollEventLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b)
startPollEventLoop :: KafkaProducer -> IO (TMVar b)
startPollEventLoop KafkaProducer
producer = do
TMVar b
terminator <- STM (TMVar b) -> IO (TMVar b)
forall a. STM a -> IO a
STM.atomically STM (TMVar b)
forall a. STM (TMVar a)
TMVar.newEmptyTMVar
Async ()
_ <-
IO () -> IO b -> IO ()
forall a b. IO a -> IO b -> IO ()
Async.race_
(KafkaProducer -> IO ()
pollEvents KafkaProducer
producer)
(STM b -> IO b
forall a. STM a -> IO a
STM.atomically (STM b -> IO b) -> STM b -> IO b
forall a b. (a -> b) -> a -> b
<| TMVar b -> STM b
forall a. TMVar a -> STM a
TMVar.readTMVar TMVar b
terminator)
IO () -> (IO () -> IO (Async ())) -> IO (Async ())
forall a b. a -> (a -> b) -> b
|> IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async
TMVar b -> IO (TMVar b)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure TMVar b
terminator
pollEvents :: Producer.KafkaProducer -> Prelude.IO ()
pollEvents :: KafkaProducer -> IO ()
pollEvents KafkaProducer
producer = do
KafkaProducer
-> [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> [ProducerRecord] -> m [(ProducerRecord, KafkaError)]
Producer.produceMessageBatch KafkaProducer
producer []
IO [(ProducerRecord, KafkaError)]
-> (IO [(ProducerRecord, KafkaError)] -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> ([(ProducerRecord, KafkaError)] -> ())
-> IO [(ProducerRecord, KafkaError)] -> IO ()
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map (\[(ProducerRecord, KafkaError)]
_ -> ())
Int -> IO ()
Control.Concurrent.threadDelay Int
100_000
KafkaProducer -> IO ()
pollEvents KafkaProducer
producer
mkHandler :: Settings.Settings -> Producer.KafkaProducer -> Prelude.IO Internal.Handler
mkHandler :: Settings -> KafkaProducer -> IO Handler
mkHandler Settings
settings KafkaProducer
producer = do
Handler
doAnything <- IO Handler
Platform.doAnythingHandler
Handler -> IO Handler
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
Handler :: (Task Never () -> Msg -> Task Text ())
-> (Msg -> Task Text ()) -> Handler
Internal.Handler
{ sendAsync :: Task Never () -> Msg -> Task Text ()
Internal.sendAsync = \Task Never ()
onDeliveryCallback Msg
msg' ->
Text -> Task Text () -> Task Text ()
forall e a. HasCallStack => Text -> Task e a -> Task e a
Platform.tracingSpan Text
"Async send Kafka messages" (Task Text () -> Task Text ()) -> Task Text () -> Task Text ()
forall a b. (a -> b) -> a -> b
<| do
let details :: Details
details = List Text -> Msg -> Details
Details ((BrokerAddress -> Text) -> List BrokerAddress -> List Text
forall a b. (a -> b) -> List a -> List b
List.map BrokerAddress -> Text
Producer.unBrokerAddress (Settings -> List BrokerAddress
Settings.brokerAddresses Settings
settings)) Msg
msg'
Details -> Task Text ()
forall d e. TracingSpanDetails d => d -> Task e ()
Platform.setTracingSpanDetails Details
details
KafkaProducer -> Handler -> Task Never () -> Msg -> Task Error ()
sendHelperAsync KafkaProducer
producer Handler
doAnything Task Never ()
onDeliveryCallback Msg
msg'
Task Error () -> (Task Error () -> Task Text ()) -> Task Text ()
forall a b. a -> (a -> b) -> b
|> (Error -> Text) -> Task Error () -> Task Text ()
forall x y a. (x -> y) -> Task x a -> Task y a
Task.mapError Error -> Text
Internal.errorToText,
sendSync :: Msg -> Task Text ()
Internal.sendSync = \Msg
msg' ->
Text -> Task Text () -> Task Text ()
forall e a. HasCallStack => Text -> Task e a -> Task e a
Platform.tracingSpan Text
"Sync send Kafka messages" (Task Text () -> Task Text ()) -> Task Text () -> Task Text ()
forall a b. (a -> b) -> a -> b
<| do
let details :: Details
details = List Text -> Msg -> Details
Details ((BrokerAddress -> Text) -> List BrokerAddress -> List Text
forall a b. (a -> b) -> List a -> List b
List.map BrokerAddress -> Text
Producer.unBrokerAddress (Settings -> List BrokerAddress
Settings.brokerAddresses Settings
settings)) Msg
msg'
Details -> Task Text ()
forall d e. TracingSpanDetails d => d -> Task e ()
Platform.setTracingSpanDetails Details
details
TMVar Terminate
terminator <- Handler -> STM (TMVar Terminate) -> Task Text (TMVar Terminate)
forall a e. Handler -> STM a -> Task e a
doSTM Handler
doAnything STM (TMVar Terminate)
forall a. STM (TMVar a)
TMVar.newEmptyTMVar
let onDeliveryCallback :: Task e ()
onDeliveryCallback = Handler -> STM () -> Task e ()
forall a e. Handler -> STM a -> Task e a
doSTM Handler
doAnything (TMVar Terminate -> Terminate -> STM ()
forall a. TMVar a -> a -> STM ()
TMVar.putTMVar TMVar Terminate
terminator Terminate
Terminate)
KafkaProducer -> Handler -> Task Never () -> Msg -> Task Error ()
sendHelperAsync KafkaProducer
producer Handler
doAnything Task Never ()
forall e. Task e ()
onDeliveryCallback Msg
msg'
Task Error () -> (Task Error () -> Task Text ()) -> Task Text ()
forall a b. a -> (a -> b) -> b
|> (Error -> Text) -> Task Error () -> Task Text ()
forall x y a. (x -> y) -> Task x a -> Task y a
Task.mapError Error -> Text
Internal.errorToText
Terminate
Terminate <- Handler -> STM Terminate -> Task Text Terminate
forall a e. Handler -> STM a -> Task e a
doSTM Handler
doAnything (TMVar Terminate -> STM Terminate
forall a. TMVar a -> STM a
TMVar.readTMVar TMVar Terminate
terminator)
() -> Task Text ()
forall a x. a -> Task x a
Task.succeed ()
}
doSTM :: Platform.DoAnythingHandler -> STM.STM a -> Task e a
doSTM :: Handler -> STM a -> Task e a
doSTM Handler
doAnything STM a
stm =
STM a -> IO a
forall a. STM a -> IO a
STM.atomically STM a
stm
IO a -> (IO a -> IO (Result e a)) -> IO (Result e a)
forall a b. a -> (a -> b) -> b
|> (a -> Result e a) -> IO a -> IO (Result e a)
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map a -> Result e a
forall error value. value -> Result error value
Ok
IO (Result e a) -> (IO (Result e a) -> Task e a) -> Task e a
forall a b. a -> (a -> b) -> b
|> Handler -> IO (Result e a) -> Task e a
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything
mkProducer :: Settings.Settings -> Prelude.IO Producer.KafkaProducer
mkProducer :: Settings -> IO KafkaProducer
mkProducer Settings.Settings {List BrokerAddress
brokerAddresses :: List BrokerAddress
brokerAddresses :: Settings -> List BrokerAddress
Settings.brokerAddresses, Timeout
deliveryTimeout :: Settings -> Timeout
deliveryTimeout :: Timeout
Settings.deliveryTimeout, KafkaLogLevel
logLevel :: Settings -> KafkaLogLevel
logLevel :: KafkaLogLevel
Settings.logLevel, BatchNumMessages
batchNumMessages :: Settings -> BatchNumMessages
batchNumMessages :: BatchNumMessages
Settings.batchNumMessages} = do
let properties :: ProducerProperties
properties =
List BrokerAddress -> ProducerProperties
Producer.brokersList List BrokerAddress
brokerAddresses
ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Timeout -> ProducerProperties
Producer.sendTimeout Timeout
deliveryTimeout
ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaLogLevel -> ProducerProperties
Producer.logLevel KafkaLogLevel
logLevel
ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaCompressionCodec -> ProducerProperties
Producer.compression KafkaCompressionCodec
Producer.Snappy
ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Map Text Text -> ProducerProperties
Producer.extraProps
( List (Text, Text) -> Map Text Text
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
[ ( Text
"batch.num.messages",
BatchNumMessages
batchNumMessages
BatchNumMessages -> (BatchNumMessages -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> BatchNumMessages -> Int
Settings.unBatchNumMessages
Int -> (Int -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> Int -> Text
Text.fromInt
),
(Text
"enable.idempotence", Text
"true"),
(Text
"acks", Text
"all")
]
)
Either KafkaError KafkaProducer
eitherProducer <- ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
Producer.newProducer ProducerProperties
properties
case Either KafkaError KafkaProducer
eitherProducer of
Prelude.Left KafkaError
err ->
KafkaError -> IO KafkaProducer
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
Prelude.Right KafkaProducer
producer ->
KafkaProducer -> IO KafkaProducer
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure KafkaProducer
producer
sendHelperAsync ::
Producer.KafkaProducer ->
Platform.DoAnythingHandler ->
Task Never () ->
Internal.Msg ->
Task Internal.Error ()
sendHelperAsync :: KafkaProducer -> Handler -> Task Never () -> Msg -> Task Error ()
sendHelperAsync KafkaProducer
producer Handler
doAnything Task Never ()
onDeliveryCallback Msg
msg' = do
ProducerRecord
record' <- Msg -> Task Error ProducerRecord
forall e. Msg -> Task e ProducerRecord
record Msg
msg'
(SomeException -> IO (Result Error ()))
-> IO (Result Error ()) -> IO (Result Error ())
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Exception.handleAny
(\SomeException
exception -> Result Error () -> IO (Result Error ())
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (Error -> Result Error ()
forall error value. error -> Result error value
Err (SomeException -> Error
Internal.Uncaught SomeException
exception)))
( do
Either ImmediateError ()
maybeFailedMessages <-
KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> IO (Either ImmediateError ())
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
Producer.produceMessage'
KafkaProducer
producer
ProducerRecord
record'
( \DeliveryReport
deliveryReport -> do
LogHandler
log <- IO LogHandler
Platform.silentHandler
LogHandler -> Task Never () -> IO ()
forall a. LogHandler -> Task Never a -> IO a
Task.perform LogHandler
log
(Task Never () -> IO ()) -> Task Never () -> IO ()
forall a b. (a -> b) -> a -> b
<| case DeliveryReport
deliveryReport of
Producer.DeliverySuccess ProducerRecord
_producerRecord Offset
_offset -> Task Never ()
onDeliveryCallback
DeliveryReport
_ -> () -> Task Never ()
forall a x. a -> Task x a
Task.succeed ()
)
Result Error () -> IO (Result Error ())
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (Result Error () -> IO (Result Error ()))
-> Result Error () -> IO (Result Error ())
forall a b. (a -> b) -> a -> b
<| case Either ImmediateError ()
maybeFailedMessages of
Prelude.Right ()
_ -> () -> Result Error ()
forall error value. value -> Result error value
Ok ()
Prelude.Left (Producer.ImmediateError KafkaError
failure) ->
Error -> Result Error ()
forall error value. error -> Result error value
Err ((ProducerRecord, KafkaError) -> Error
Internal.SendingFailed (ProducerRecord
record', KafkaError
failure))
)
IO (Result Error ())
-> (IO (Result Error ()) -> Task Error ()) -> Task Error ()
forall a b. a -> (a -> b) -> b
|> Handler -> IO (Result Error ()) -> Task Error ()
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything