{-# LANGUAGE RankNTypes #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}

-- | Kafka is a module for _writing_ to Kafka
--
-- See Kafka.Worker for the basic building blocks of a CLI app that will poll &
-- process kafka messages
module Kafka
  ( -- * Setup
    Internal.Handler,
    Settings.Settings,
    Settings.decoder,
    handler,

    -- * Creating messages
    Internal.Msg,
    emptyMsg,
    addPayload,
    addKey,

    -- * Sending messags
    Internal.sendAsync,
    Internal.sendSync,

    -- * Reading messages
    topic,
    payload,
    key,
  )
where

import qualified Conduit
import qualified Control.Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TMVar as TMVar
import qualified Control.Exception.Safe as Exception
import Control.Monad.IO.Class (liftIO)
import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Lazy as ByteString.Lazy
import qualified Data.Text.Encoding
import qualified Dict
import qualified Kafka.Internal as Internal
import qualified Kafka.Producer as Producer
import qualified Kafka.Settings as Settings
import qualified Platform
import qualified Prelude

data Details = Details
  { Details -> List Text
detailsBrokers :: List Text,
    Details -> Msg
detailsMsg :: Internal.Msg
  }
  deriving ((forall x. Details -> Rep Details x)
-> (forall x. Rep Details x -> Details) -> Generic Details
forall x. Rep Details x -> Details
forall x. Details -> Rep Details x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Details x -> Details
$cfrom :: forall x. Details -> Rep Details x
Generic, Int -> Details -> ShowS
[Details] -> ShowS
Details -> String
(Int -> Details -> ShowS)
-> (Details -> String) -> ([Details] -> ShowS) -> Show Details
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Details] -> ShowS
$cshowList :: [Details] -> ShowS
show :: Details -> String
$cshow :: Details -> String
showsPrec :: Int -> Details -> ShowS
$cshowsPrec :: Int -> Details -> ShowS
Show)

instance Aeson.ToJSON Details

instance Platform.TracingSpanDetails Details

newtype DeliveryReportDetails = DeliveryReportDetails
  { DeliveryReportDetails -> Text
deliveryReportProducerRecord :: Text
  }
  deriving ((forall x. DeliveryReportDetails -> Rep DeliveryReportDetails x)
-> (forall x. Rep DeliveryReportDetails x -> DeliveryReportDetails)
-> Generic DeliveryReportDetails
forall x. Rep DeliveryReportDetails x -> DeliveryReportDetails
forall x. DeliveryReportDetails -> Rep DeliveryReportDetails x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep DeliveryReportDetails x -> DeliveryReportDetails
$cfrom :: forall x. DeliveryReportDetails -> Rep DeliveryReportDetails x
Generic, Int -> DeliveryReportDetails -> ShowS
[DeliveryReportDetails] -> ShowS
DeliveryReportDetails -> String
(Int -> DeliveryReportDetails -> ShowS)
-> (DeliveryReportDetails -> String)
-> ([DeliveryReportDetails] -> ShowS)
-> Show DeliveryReportDetails
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DeliveryReportDetails] -> ShowS
$cshowList :: [DeliveryReportDetails] -> ShowS
show :: DeliveryReportDetails -> String
$cshow :: DeliveryReportDetails -> String
showsPrec :: Int -> DeliveryReportDetails -> ShowS
$cshowsPrec :: Int -> DeliveryReportDetails -> ShowS
Show)

instance Aeson.ToJSON DeliveryReportDetails

instance Platform.TracingSpanDetails DeliveryReportDetails

-- | Creates a Kafka-writable message for a topic.
--
-- > msg =
-- >   emptyMsg "groceries"
-- >     |> addPayload "broccoli"
-- >     |> addKey "vegetables"
emptyMsg :: Text -> Internal.Msg
emptyMsg :: Text -> Msg
emptyMsg Text
topic' =
  Msg :: Topic -> Maybe Key -> Maybe Encodable -> Msg
Internal.Msg
    { topic :: Topic
Internal.topic = Text -> Topic
Internal.Topic Text
topic',
      payload :: Maybe Encodable
Internal.payload = Maybe Encodable
forall a. Maybe a
Nothing,
      key :: Maybe Key
Internal.key = Maybe Key
forall a. Maybe a
Nothing
    }

-- Add a payload to a message.
--
-- Message payloads aren't mandatory in Kafka, so using this function really is
-- optional. A counter is an example of an application that doesn't require
-- message payloads. Just knowing an increment event took place would be enough
-- for it to work.
--
-- We ask for JSON decodability to ensure the Kafka worker can later read the message
addPayload :: (Aeson.FromJSON a, Aeson.ToJSON a) => a -> Internal.Msg -> Internal.Msg
addPayload :: a -> Msg -> Msg
addPayload a
contents Msg
msg =
  Msg
msg {payload :: Maybe Encodable
Internal.payload = (Encodable -> Maybe Encodable
forall a. a -> Maybe a
Just (a -> Encodable
forall a. (FromJSON a, ToJSON a) => a -> Encodable
Internal.Encodable a
contents))}

-- Add a key to a message.
--
-- Kafka divides messages in a topic in different partitions. Kafka workers can
-- collaborate on a topic by each processing messages from a couple of the
-- topic's partitions. Within a partition messages will never overtake each
-- other.
--
-- By default each message is assigned to a random partition. Setting a key on
-- the message gives you more control over this process. Messages with the same
-- key are guaranteed to end up in the same partition.
--
-- Example: if each message is related to a single user and you need to ensure
-- messagse for a user don't overtake each other, you can set the key to be the
-- user's id.
addKey :: Text -> Internal.Msg -> Internal.Msg
addKey :: Text -> Msg -> Msg
addKey Text
key' Msg
msg = Msg
msg {key :: Maybe Key
Internal.key = Key -> Maybe Key
forall a. a -> Maybe a
Just (Text -> Key
Internal.Key Text
key')}

record :: Internal.Msg -> Task e Producer.ProducerRecord
record :: Msg -> Task e ProducerRecord
record Msg
msg = do
  Text
requestId <- Task e Text
forall e. Task e Text
Platform.requestId
  ProducerRecord -> Task e ProducerRecord
forall a x. a -> Task x a
Task.succeed
    ProducerRecord :: TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> ProducerRecord
Producer.ProducerRecord
      { prTopic :: TopicName
Producer.prTopic =
          Msg -> Topic
Internal.topic Msg
msg
            Topic -> (Topic -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> Topic -> Text
Internal.unTopic
            Text -> (Text -> TopicName) -> TopicName
forall a b. a -> (a -> b) -> b
|> Text -> TopicName
Producer.TopicName,
        prPartition :: ProducePartition
Producer.prPartition = ProducePartition
Producer.UnassignedPartition,
        prKey :: Maybe ByteString
Producer.prKey =
          (Key -> ByteString) -> Maybe Key -> Maybe ByteString
forall a b. (a -> b) -> Maybe a -> Maybe b
Maybe.map
            (Text -> ByteString
Data.Text.Encoding.encodeUtf8 (Text -> ByteString) -> (Key -> Text) -> Key -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< Key -> Text
Internal.unKey)
            (Msg -> Maybe Key
Internal.key Msg
msg),
        prValue :: Maybe ByteString
Producer.prValue =
          (Encodable -> ByteString) -> Maybe Encodable -> Maybe ByteString
forall a b. (a -> b) -> Maybe a -> Maybe b
Maybe.map
            ( \Encodable
payload' ->
                MsgWithMetaData :: MetaData -> Encodable -> MsgWithMetaData
Internal.MsgWithMetaData
                  { metaData :: MetaData
Internal.metaData =
                      MetaData :: Text -> MetaData
Internal.MetaData
                        { Text
requestId :: Text
requestId :: Text
Internal.requestId
                        },
                    value :: Encodable
Internal.value = Encodable
payload'
                  }
                  MsgWithMetaData -> (MsgWithMetaData -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> MsgWithMetaData -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode
                  ByteString -> (ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> ByteString -> ByteString
ByteString.Lazy.toStrict
            )
            (Msg -> Maybe Encodable
Internal.payload Msg
msg)
      }

-- | The topic of a message. This function might sometimes be useful in tests.
topic :: Internal.Msg -> Text
topic :: Msg -> Text
topic Msg
msg = Topic -> Text
Internal.unTopic (Msg -> Topic
Internal.topic Msg
msg)

-- | The payload of a message. This function might sometimes be useful in tests.
payload :: (Aeson.FromJSON a) => Internal.Msg -> Maybe a
payload :: Msg -> Maybe a
payload Msg
msg =
  Msg -> Maybe Encodable
Internal.payload Msg
msg
    Maybe Encodable -> (Maybe Encodable -> Maybe a) -> Maybe a
forall a b. a -> (a -> b) -> b
|> (Encodable -> Maybe a) -> Maybe Encodable -> Maybe a
forall a b. (a -> Maybe b) -> Maybe a -> Maybe b
Maybe.andThen (ByteString -> Maybe a
forall a. FromJSON a => ByteString -> Maybe a
Aeson.decode (ByteString -> Maybe a)
-> (Encodable -> ByteString) -> Encodable -> Maybe a
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< Encodable -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode)

-- | The key of a message. This function might sometimes be useful in tests.
key :: Internal.Msg -> Maybe Text
key :: Msg -> Maybe Text
key Msg
msg = (Key -> Text) -> Maybe Key -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
Maybe.map Key -> Text
Internal.unKey (Msg -> Maybe Key
Internal.key Msg
msg)

-- | Function for creating a Kafka handler.
--
-- See 'Kafka.Settings' for potential customizations.
handler :: Settings.Settings -> Conduit.Acquire Internal.Handler
handler :: Settings -> Acquire Handler
handler Settings
settings = do
  KafkaProducer
producer <- IO KafkaProducer
-> (KafkaProducer -> IO ()) -> Acquire KafkaProducer
forall a. IO a -> (a -> IO ()) -> Acquire a
Conduit.mkAcquire (Settings -> IO KafkaProducer
mkProducer Settings
settings) KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
Producer.closeProducer
  TMVar Terminate
_ <- IO (TMVar Terminate)
-> (TMVar Terminate -> IO ()) -> Acquire (TMVar Terminate)
forall a. IO a -> (a -> IO ()) -> Acquire a
Conduit.mkAcquire (KafkaProducer -> IO (TMVar Terminate)
forall b. KafkaProducer -> IO (TMVar b)
startPollEventLoop KafkaProducer
producer) (\TMVar Terminate
terminator -> STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (TMVar Terminate -> Terminate -> STM ()
forall a. TMVar a -> a -> STM ()
TMVar.putTMVar TMVar Terminate
terminator Terminate
Terminate))
  IO Handler -> Acquire Handler
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Settings -> KafkaProducer -> IO Handler
mkHandler Settings
settings KafkaProducer
producer)

data Terminate = Terminate

-- | By default events only get polled right before sending a record to kafka.
-- This means that the deliveryCallback only gets fired on the next call to produceMessage'.
-- We want to be informed about delivery status as soon as possible though.
startPollEventLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b)
startPollEventLoop :: KafkaProducer -> IO (TMVar b)
startPollEventLoop KafkaProducer
producer = do
  TMVar b
terminator <- STM (TMVar b) -> IO (TMVar b)
forall a. STM a -> IO a
STM.atomically STM (TMVar b)
forall a. STM (TMVar a)
TMVar.newEmptyTMVar
  Async ()
_ <-
    IO () -> IO b -> IO ()
forall a b. IO a -> IO b -> IO ()
Async.race_
      (KafkaProducer -> IO ()
pollEvents KafkaProducer
producer)
      (STM b -> IO b
forall a. STM a -> IO a
STM.atomically (STM b -> IO b) -> STM b -> IO b
forall a b. (a -> b) -> a -> b
<| TMVar b -> STM b
forall a. TMVar a -> STM a
TMVar.readTMVar TMVar b
terminator)
      IO () -> (IO () -> IO (Async ())) -> IO (Async ())
forall a b. a -> (a -> b) -> b
|> IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async
  TMVar b -> IO (TMVar b)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure TMVar b
terminator

-- | We use a little trick here to poll events, by sending an empty message batch.
-- This will call the internal pollEvent function in hw-kafka-client.
pollEvents :: Producer.KafkaProducer -> Prelude.IO ()
pollEvents :: KafkaProducer -> IO ()
pollEvents KafkaProducer
producer = do
  KafkaProducer
-> [ProducerRecord] -> IO [(ProducerRecord, KafkaError)]
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> [ProducerRecord] -> m [(ProducerRecord, KafkaError)]
Producer.produceMessageBatch KafkaProducer
producer []
    IO [(ProducerRecord, KafkaError)]
-> (IO [(ProducerRecord, KafkaError)] -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> ([(ProducerRecord, KafkaError)] -> ())
-> IO [(ProducerRecord, KafkaError)] -> IO ()
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map (\[(ProducerRecord, KafkaError)]
_ -> ())
  Int -> IO ()
Control.Concurrent.threadDelay Int
100_000 {- 100ms -}
  KafkaProducer -> IO ()
pollEvents KafkaProducer
producer

-- |
mkHandler :: Settings.Settings -> Producer.KafkaProducer -> Prelude.IO Internal.Handler
mkHandler :: Settings -> KafkaProducer -> IO Handler
mkHandler Settings
settings KafkaProducer
producer = do
  Handler
doAnything <- IO Handler
Platform.doAnythingHandler
  Handler -> IO Handler
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
    Handler :: (Task Never () -> Msg -> Task Text ())
-> (Msg -> Task Text ()) -> Handler
Internal.Handler
      { sendAsync :: Task Never () -> Msg -> Task Text ()
Internal.sendAsync = \Task Never ()
onDeliveryCallback Msg
msg' ->
          Text -> Task Text () -> Task Text ()
forall e a. HasCallStack => Text -> Task e a -> Task e a
Platform.tracingSpan Text
"Async send Kafka messages" (Task Text () -> Task Text ()) -> Task Text () -> Task Text ()
forall a b. (a -> b) -> a -> b
<| do
            let details :: Details
details = List Text -> Msg -> Details
Details ((BrokerAddress -> Text) -> List BrokerAddress -> List Text
forall a b. (a -> b) -> List a -> List b
List.map BrokerAddress -> Text
Producer.unBrokerAddress (Settings -> List BrokerAddress
Settings.brokerAddresses Settings
settings)) Msg
msg'
            Details -> Task Text ()
forall d e. TracingSpanDetails d => d -> Task e ()
Platform.setTracingSpanDetails Details
details
            KafkaProducer -> Handler -> Task Never () -> Msg -> Task Error ()
sendHelperAsync KafkaProducer
producer Handler
doAnything Task Never ()
onDeliveryCallback Msg
msg'
              Task Error () -> (Task Error () -> Task Text ()) -> Task Text ()
forall a b. a -> (a -> b) -> b
|> (Error -> Text) -> Task Error () -> Task Text ()
forall x y a. (x -> y) -> Task x a -> Task y a
Task.mapError Error -> Text
Internal.errorToText,
        sendSync :: Msg -> Task Text ()
Internal.sendSync = \Msg
msg' ->
          Text -> Task Text () -> Task Text ()
forall e a. HasCallStack => Text -> Task e a -> Task e a
Platform.tracingSpan Text
"Sync send Kafka messages" (Task Text () -> Task Text ()) -> Task Text () -> Task Text ()
forall a b. (a -> b) -> a -> b
<| do
            let details :: Details
details = List Text -> Msg -> Details
Details ((BrokerAddress -> Text) -> List BrokerAddress -> List Text
forall a b. (a -> b) -> List a -> List b
List.map BrokerAddress -> Text
Producer.unBrokerAddress (Settings -> List BrokerAddress
Settings.brokerAddresses Settings
settings)) Msg
msg'
            Details -> Task Text ()
forall d e. TracingSpanDetails d => d -> Task e ()
Platform.setTracingSpanDetails Details
details
            TMVar Terminate
terminator <- Handler -> STM (TMVar Terminate) -> Task Text (TMVar Terminate)
forall a e. Handler -> STM a -> Task e a
doSTM Handler
doAnything STM (TMVar Terminate)
forall a. STM (TMVar a)
TMVar.newEmptyTMVar
            let onDeliveryCallback :: Task e ()
onDeliveryCallback = Handler -> STM () -> Task e ()
forall a e. Handler -> STM a -> Task e a
doSTM Handler
doAnything (TMVar Terminate -> Terminate -> STM ()
forall a. TMVar a -> a -> STM ()
TMVar.putTMVar TMVar Terminate
terminator Terminate
Terminate)
            KafkaProducer -> Handler -> Task Never () -> Msg -> Task Error ()
sendHelperAsync KafkaProducer
producer Handler
doAnything Task Never ()
forall e. Task e ()
onDeliveryCallback Msg
msg'
              Task Error () -> (Task Error () -> Task Text ()) -> Task Text ()
forall a b. a -> (a -> b) -> b
|> (Error -> Text) -> Task Error () -> Task Text ()
forall x y a. (x -> y) -> Task x a -> Task y a
Task.mapError Error -> Text
Internal.errorToText
            Terminate
Terminate <- Handler -> STM Terminate -> Task Text Terminate
forall a e. Handler -> STM a -> Task e a
doSTM Handler
doAnything (TMVar Terminate -> STM Terminate
forall a. TMVar a -> STM a
TMVar.readTMVar TMVar Terminate
terminator)
            () -> Task Text ()
forall a x. a -> Task x a
Task.succeed ()
      }

doSTM :: Platform.DoAnythingHandler -> STM.STM a -> Task e a
doSTM :: Handler -> STM a -> Task e a
doSTM Handler
doAnything STM a
stm =
  STM a -> IO a
forall a. STM a -> IO a
STM.atomically STM a
stm
    IO a -> (IO a -> IO (Result e a)) -> IO (Result e a)
forall a b. a -> (a -> b) -> b
|> (a -> Result e a) -> IO a -> IO (Result e a)
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map a -> Result e a
forall error value. value -> Result error value
Ok
    IO (Result e a) -> (IO (Result e a) -> Task e a) -> Task e a
forall a b. a -> (a -> b) -> b
|> Handler -> IO (Result e a) -> Task e a
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything

mkProducer :: Settings.Settings -> Prelude.IO Producer.KafkaProducer
mkProducer :: Settings -> IO KafkaProducer
mkProducer Settings.Settings {List BrokerAddress
brokerAddresses :: List BrokerAddress
brokerAddresses :: Settings -> List BrokerAddress
Settings.brokerAddresses, Timeout
deliveryTimeout :: Settings -> Timeout
deliveryTimeout :: Timeout
Settings.deliveryTimeout, KafkaLogLevel
logLevel :: Settings -> KafkaLogLevel
logLevel :: KafkaLogLevel
Settings.logLevel, BatchNumMessages
batchNumMessages :: Settings -> BatchNumMessages
batchNumMessages :: BatchNumMessages
Settings.batchNumMessages} = do
  let properties :: ProducerProperties
properties =
        List BrokerAddress -> ProducerProperties
Producer.brokersList List BrokerAddress
brokerAddresses
          ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Timeout -> ProducerProperties
Producer.sendTimeout Timeout
deliveryTimeout
          ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaLogLevel -> ProducerProperties
Producer.logLevel KafkaLogLevel
logLevel
          ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaCompressionCodec -> ProducerProperties
Producer.compression KafkaCompressionCodec
Producer.Snappy
          ProducerProperties -> ProducerProperties -> ProducerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Map Text Text -> ProducerProperties
Producer.extraProps
            ( List (Text, Text) -> Map Text Text
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
                [ ( Text
"batch.num.messages",
                    BatchNumMessages
batchNumMessages
                      BatchNumMessages -> (BatchNumMessages -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> BatchNumMessages -> Int
Settings.unBatchNumMessages
                      Int -> (Int -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> Int -> Text
Text.fromInt
                  ),
                  -- Enable idemptent producers
                  -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference
                  (Text
"enable.idempotence", Text
"true"),
                  (Text
"acks", Text
"all")
                ]
            )
  Either KafkaError KafkaProducer
eitherProducer <- ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
Producer.newProducer ProducerProperties
properties
  case Either KafkaError KafkaProducer
eitherProducer of
    Prelude.Left KafkaError
err ->
      -- We create the handler as part of starting the application. Throwing
      -- means that if there's a problem with the settings the application will
      -- fail immediately upon start. It won't result in runtime errors during
      -- operation.
      KafkaError -> IO KafkaProducer
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
    Prelude.Right KafkaProducer
producer ->
      KafkaProducer -> IO KafkaProducer
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure KafkaProducer
producer

sendHelperAsync ::
  Producer.KafkaProducer ->
  Platform.DoAnythingHandler ->
  Task Never () ->
  Internal.Msg ->
  Task Internal.Error ()
sendHelperAsync :: KafkaProducer -> Handler -> Task Never () -> Msg -> Task Error ()
sendHelperAsync KafkaProducer
producer Handler
doAnything Task Never ()
onDeliveryCallback Msg
msg' = do
  ProducerRecord
record' <- Msg -> Task Error ProducerRecord
forall e. Msg -> Task e ProducerRecord
record Msg
msg'
  (SomeException -> IO (Result Error ()))
-> IO (Result Error ()) -> IO (Result Error ())
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Exception.handleAny
    (\SomeException
exception -> Result Error () -> IO (Result Error ())
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (Error -> Result Error ()
forall error value. error -> Result error value
Err (SomeException -> Error
Internal.Uncaught SomeException
exception)))
    ( do
        Either ImmediateError ()
maybeFailedMessages <-
          KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> IO (Either ImmediateError ())
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
Producer.produceMessage'
            KafkaProducer
producer
            ProducerRecord
record'
            ( \DeliveryReport
deliveryReport -> do
                LogHandler
log <- IO LogHandler
Platform.silentHandler
                LogHandler -> Task Never () -> IO ()
forall a. LogHandler -> Task Never a -> IO a
Task.perform LogHandler
log
                  (Task Never () -> IO ()) -> Task Never () -> IO ()
forall a b. (a -> b) -> a -> b
<| case DeliveryReport
deliveryReport of
                    Producer.DeliverySuccess ProducerRecord
_producerRecord Offset
_offset -> Task Never ()
onDeliveryCallback
                    DeliveryReport
_ -> () -> Task Never ()
forall a x. a -> Task x a
Task.succeed ()
            )
        Result Error () -> IO (Result Error ())
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (Result Error () -> IO (Result Error ()))
-> Result Error () -> IO (Result Error ())
forall a b. (a -> b) -> a -> b
<| case Either ImmediateError ()
maybeFailedMessages of
          Prelude.Right ()
_ -> () -> Result Error ()
forall error value. value -> Result error value
Ok ()
          Prelude.Left (Producer.ImmediateError KafkaError
failure) ->
            Error -> Result Error ()
forall error value. error -> Result error value
Err ((ProducerRecord, KafkaError) -> Error
Internal.SendingFailed (ProducerRecord
record', KafkaError
failure))
    )
    IO (Result Error ())
-> (IO (Result Error ()) -> Task Error ()) -> Task Error ()
forall a b. a -> (a -> b) -> b
|> Handler -> IO (Result Error ()) -> Task Error ()
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything