module Kafka.Callbacks
( errorCallback
, logCallback
, statsCallback
, Callback
)
where

import Data.ByteString (ByteString)
import Kafka.Internal.RdKafka (rdKafkaConfSetErrorCb, rdKafkaConfSetLogCb, rdKafkaConfSetStatsCb)
import Kafka.Internal.Setup (getRdKafkaConf, Callback(..))
import Kafka.Types (KafkaError(..), KafkaLogLevel(..))

-- | Add a callback for errors.
--
-- ==== __Examples__
--
-- Basic usage:
--
-- > 'setCallback' ('errorCallback' myErrorCallback)
-- >
-- > myErrorCallback :: 'KafkaError' -> String -> IO ()
-- > myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
errorCallback :: (KafkaError -> String -> IO ()) -> Callback
errorCallback :: (KafkaError -> String -> IO ()) -> Callback
errorCallback callback :: KafkaError -> String -> IO ()
callback =
  let realCb :: p -> RdKafkaRespErrT -> String -> IO ()
realCb _ err :: RdKafkaRespErrT
err = KafkaError -> String -> IO ()
callback (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)
  in (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \k :: KafkaConf
k -> RdKafkaConfTPtr -> ErrorCallback -> IO ()
rdKafkaConfSetErrorCb (KafkaConf -> RdKafkaConfTPtr
forall k. HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf KafkaConf
k) ErrorCallback
forall p. p -> RdKafkaRespErrT -> String -> IO ()
realCb

-- | Add a callback for logs.
--
-- ==== __Examples__
--
-- Basic usage:
--
-- > 'setCallback' ('logCallback' myLogCallback)
-- >
-- > myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
-- > myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
logCallback callback :: KafkaLogLevel -> String -> String -> IO ()
callback =
  let realCb :: p -> Int -> String -> String -> IO ()
realCb _ = KafkaLogLevel -> String -> String -> IO ()
callback (KafkaLogLevel -> String -> String -> IO ())
-> (Int -> KafkaLogLevel) -> Int -> String -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> KafkaLogLevel
forall a. Enum a => Int -> a
toEnum
  in (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \k :: KafkaConf
k -> RdKafkaConfTPtr -> LogCallback -> IO ()
rdKafkaConfSetLogCb (KafkaConf -> RdKafkaConfTPtr
forall k. HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf KafkaConf
k) LogCallback
forall p. p -> Int -> String -> String -> IO ()
realCb

-- | Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>.
--
-- ==== __Examples__
--
-- Basic usage:
--
-- > 'setCallback' ('statsCallback' myStatsCallback)
-- >
-- > myStatsCallback :: String -> IO ()
-- > myStatsCallback stats = print $ show stats
statsCallback :: (ByteString -> IO ()) -> Callback
statsCallback :: (ByteString -> IO ()) -> Callback
statsCallback callback :: ByteString -> IO ()
callback =
  let realCb :: p -> ByteString -> IO ()
realCb _ = ByteString -> IO ()
callback
  in (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \k :: KafkaConf
k -> RdKafkaConfTPtr -> StatsCallback -> IO ()
rdKafkaConfSetStatsCb (KafkaConf -> RdKafkaConfTPtr
forall k. HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf KafkaConf
k) StatsCallback
forall p. p -> ByteString -> IO ()
realCb