module Kafka.Internal.Setup
( KafkaProps(..)
, TopicProps(..)
, Kafka(..)
, KafkaConf(..)
, TopicConf(..)
, HasKafka(..)
, HasKafkaConf(..)
, HasTopicConf(..)
, Callback(..)
, CallbackPollStatus(..)
, getRdKafka
, getRdKafkaConf
, getRdMsgQueue
, getRdTopicConf
, newTopicConf
, newKafkaConf
, kafkaConf
, topicConf
, checkConfSetValue
, setKafkaConfValue
, setAllKafkaConfValues
, setTopicConfValue
, setAllTopicConfValues
)
where

import Kafka.Internal.RdKafka (CCharBufPointer, RdKafkaConfResT (..), RdKafkaConfTPtr, RdKafkaQueueTPtr, RdKafkaTPtr, RdKafkaTopicConfTPtr, nErrorBytes, newRdKafkaConfT, newRdKafkaTopicConfT, rdKafkaConfSet, rdKafkaTopicConfSet)
import Kafka.Types            (KafkaError (..))

import Control.Concurrent.MVar (MVar, newMVar)
import Control.Exception       (throw)
import Data.IORef              (IORef, newIORef, readIORef)
import Data.Map                (Map)
import Data.Text               (Text)
import Foreign.C.String        (peekCString)
import Foreign.Marshal.Alloc   (allocaBytes)

import qualified Data.Map  as Map
import qualified Data.Text as Text

--
-- Configuration
--
newtype KafkaProps = KafkaProps (Map Text Text) deriving (Int -> KafkaProps -> ShowS
[KafkaProps] -> ShowS
KafkaProps -> String
(Int -> KafkaProps -> ShowS)
-> (KafkaProps -> String)
-> ([KafkaProps] -> ShowS)
-> Show KafkaProps
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaProps] -> ShowS
$cshowList :: [KafkaProps] -> ShowS
show :: KafkaProps -> String
$cshow :: KafkaProps -> String
showsPrec :: Int -> KafkaProps -> ShowS
$cshowsPrec :: Int -> KafkaProps -> ShowS
Show, KafkaProps -> KafkaProps -> Bool
(KafkaProps -> KafkaProps -> Bool)
-> (KafkaProps -> KafkaProps -> Bool) -> Eq KafkaProps
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: KafkaProps -> KafkaProps -> Bool
$c/= :: KafkaProps -> KafkaProps -> Bool
== :: KafkaProps -> KafkaProps -> Bool
$c== :: KafkaProps -> KafkaProps -> Bool
Eq)
newtype TopicProps = TopicProps (Map Text Text) deriving (Int -> TopicProps -> ShowS
[TopicProps] -> ShowS
TopicProps -> String
(Int -> TopicProps -> ShowS)
-> (TopicProps -> String)
-> ([TopicProps] -> ShowS)
-> Show TopicProps
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TopicProps] -> ShowS
$cshowList :: [TopicProps] -> ShowS
show :: TopicProps -> String
$cshow :: TopicProps -> String
showsPrec :: Int -> TopicProps -> ShowS
$cshowsPrec :: Int -> TopicProps -> ShowS
Show, TopicProps -> TopicProps -> Bool
(TopicProps -> TopicProps -> Bool)
-> (TopicProps -> TopicProps -> Bool) -> Eq TopicProps
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: TopicProps -> TopicProps -> Bool
$c/= :: TopicProps -> TopicProps -> Bool
== :: TopicProps -> TopicProps -> Bool
$c== :: TopicProps -> TopicProps -> Bool
Eq)
newtype Kafka      = Kafka RdKafkaTPtr deriving Int -> Kafka -> ShowS
[Kafka] -> ShowS
Kafka -> String
(Int -> Kafka -> ShowS)
-> (Kafka -> String) -> ([Kafka] -> ShowS) -> Show Kafka
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Kafka] -> ShowS
$cshowList :: [Kafka] -> ShowS
show :: Kafka -> String
$cshow :: Kafka -> String
showsPrec :: Int -> Kafka -> ShowS
$cshowsPrec :: Int -> Kafka -> ShowS
Show
newtype TopicConf  = TopicConf RdKafkaTopicConfTPtr deriving Int -> TopicConf -> ShowS
[TopicConf] -> ShowS
TopicConf -> String
(Int -> TopicConf -> ShowS)
-> (TopicConf -> String)
-> ([TopicConf] -> ShowS)
-> Show TopicConf
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TopicConf] -> ShowS
$cshowList :: [TopicConf] -> ShowS
show :: TopicConf -> String
$cshow :: TopicConf -> String
showsPrec :: Int -> TopicConf -> ShowS
$cshowsPrec :: Int -> TopicConf -> ShowS
Show

-- | Callbacks allow retrieving various information like error occurences, statistics
-- and log messages.
-- See `Kafka.Consumer.setCallback` (Consumer) and `Kafka.Producer.setCallback` (Producer) for more details.
newtype Callback = Callback (KafkaConf -> IO ())

data CallbackPollStatus = CallbackPollEnabled | CallbackPollDisabled deriving (Int -> CallbackPollStatus -> ShowS
[CallbackPollStatus] -> ShowS
CallbackPollStatus -> String
(Int -> CallbackPollStatus -> ShowS)
-> (CallbackPollStatus -> String)
-> ([CallbackPollStatus] -> ShowS)
-> Show CallbackPollStatus
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CallbackPollStatus] -> ShowS
$cshowList :: [CallbackPollStatus] -> ShowS
show :: CallbackPollStatus -> String
$cshow :: CallbackPollStatus -> String
showsPrec :: Int -> CallbackPollStatus -> ShowS
$cshowsPrec :: Int -> CallbackPollStatus -> ShowS
Show, CallbackPollStatus -> CallbackPollStatus -> Bool
(CallbackPollStatus -> CallbackPollStatus -> Bool)
-> (CallbackPollStatus -> CallbackPollStatus -> Bool)
-> Eq CallbackPollStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CallbackPollStatus -> CallbackPollStatus -> Bool
$c/= :: CallbackPollStatus -> CallbackPollStatus -> Bool
== :: CallbackPollStatus -> CallbackPollStatus -> Bool
$c== :: CallbackPollStatus -> CallbackPollStatus -> Bool
Eq)

data KafkaConf         = KafkaConf
  { KafkaConf -> RdKafkaConfTPtr
kcfgKafkaConfPtr       :: RdKafkaConfTPtr
    -- ^ A pointer to a native Kafka configuration

  , KafkaConf -> IORef (Maybe RdKafkaQueueTPtr)
kcfgMessagesQueue      :: IORef (Maybe RdKafkaQueueTPtr)
    -- ^ A queue for messages

  , KafkaConf -> MVar CallbackPollStatus
kcfgCallbackPollStatus :: MVar CallbackPollStatus
    -- ^ A mutex to prevent handling callbacks from multiple threads
    -- which can be dangerous in some cases.
  }

class HasKafka a where
  getKafka :: a -> Kafka

class HasKafkaConf a where
  getKafkaConf :: a -> KafkaConf

class HasTopicConf a where
  getTopicConf :: a -> TopicConf

instance HasKafkaConf KafkaConf where
  getKafkaConf :: KafkaConf -> KafkaConf
getKafkaConf = KafkaConf -> KafkaConf
forall a. a -> a
id
  {-# INLINE getKafkaConf #-}

instance HasKafka Kafka where
  getKafka :: Kafka -> Kafka
getKafka = Kafka -> Kafka
forall a. a -> a
id
  {-# INLINE getKafka #-}

instance HasTopicConf TopicConf where
  getTopicConf :: TopicConf -> TopicConf
getTopicConf = TopicConf -> TopicConf
forall a. a -> a
id
  {-# INLINE getTopicConf #-}

getRdKafka :: HasKafka k => k -> RdKafkaTPtr
getRdKafka :: k -> RdKafkaTPtr
getRdKafka k :: k
k = let (Kafka k' :: RdKafkaTPtr
k') = k -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka k
k in RdKafkaTPtr
k'
{-# INLINE getRdKafka #-}

getRdKafkaConf :: HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf :: k -> RdKafkaConfTPtr
getRdKafkaConf k :: k
k = let (KafkaConf k' :: RdKafkaConfTPtr
k' _ _) = k -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf k
k in RdKafkaConfTPtr
k'
{-# INLINE getRdKafkaConf #-}

getRdMsgQueue :: HasKafkaConf k => k -> IO (Maybe RdKafkaQueueTPtr)
getRdMsgQueue :: k -> IO (Maybe RdKafkaQueueTPtr)
getRdMsgQueue k :: k
k =
  let (KafkaConf _ rq :: IORef (Maybe RdKafkaQueueTPtr)
rq _) = k -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf k
k
  in IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
rq

getRdTopicConf :: HasTopicConf t => t -> RdKafkaTopicConfTPtr
getRdTopicConf :: t -> RdKafkaTopicConfTPtr
getRdTopicConf t :: t
t = let (TopicConf t' :: RdKafkaTopicConfTPtr
t') = t -> TopicConf
forall a. HasTopicConf a => a -> TopicConf
getTopicConf t
t in RdKafkaTopicConfTPtr
t'
{-# INLINE getRdTopicConf #-}

newTopicConf :: IO TopicConf
newTopicConf :: IO TopicConf
newTopicConf = RdKafkaTopicConfTPtr -> TopicConf
TopicConf (RdKafkaTopicConfTPtr -> TopicConf)
-> IO RdKafkaTopicConfTPtr -> IO TopicConf
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO RdKafkaTopicConfTPtr
newRdKafkaTopicConfT

newKafkaConf :: IO KafkaConf
newKafkaConf :: IO KafkaConf
newKafkaConf = RdKafkaConfTPtr
-> IORef (Maybe RdKafkaQueueTPtr)
-> MVar CallbackPollStatus
-> KafkaConf
KafkaConf (RdKafkaConfTPtr
 -> IORef (Maybe RdKafkaQueueTPtr)
 -> MVar CallbackPollStatus
 -> KafkaConf)
-> IO RdKafkaConfTPtr
-> IO
     (IORef (Maybe RdKafkaQueueTPtr)
      -> MVar CallbackPollStatus -> KafkaConf)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO RdKafkaConfTPtr
newRdKafkaConfT IO
  (IORef (Maybe RdKafkaQueueTPtr)
   -> MVar CallbackPollStatus -> KafkaConf)
-> IO (IORef (Maybe RdKafkaQueueTPtr))
-> IO (MVar CallbackPollStatus -> KafkaConf)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RdKafkaQueueTPtr -> IO (IORef (Maybe RdKafkaQueueTPtr))
forall a. a -> IO (IORef a)
newIORef Maybe RdKafkaQueueTPtr
forall a. Maybe a
Nothing IO (MVar CallbackPollStatus -> KafkaConf)
-> IO (MVar CallbackPollStatus) -> IO KafkaConf
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> CallbackPollStatus -> IO (MVar CallbackPollStatus)
forall a. a -> IO (MVar a)
newMVar CallbackPollStatus
CallbackPollEnabled

kafkaConf :: KafkaProps -> IO KafkaConf
kafkaConf :: KafkaProps -> IO KafkaConf
kafkaConf overrides :: KafkaProps
overrides = do
  KafkaConf
conf <- IO KafkaConf
newKafkaConf
  KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues KafkaConf
conf KafkaProps
overrides
  KafkaConf -> IO KafkaConf
forall (m :: * -> *) a. Monad m => a -> m a
return KafkaConf
conf

topicConf :: TopicProps -> IO TopicConf
topicConf :: TopicProps -> IO TopicConf
topicConf overrides :: TopicProps
overrides = do
  TopicConf
conf <- IO TopicConf
newTopicConf
  TopicConf -> TopicProps -> IO ()
setAllTopicConfValues TopicConf
conf TopicProps
overrides
  TopicConf -> IO TopicConf
forall (m :: * -> *) a. Monad m => a -> m a
return TopicConf
conf

checkConfSetValue :: RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue :: RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue err :: RdKafkaConfResT
err charPtr :: CCharBufPointer
charPtr = case RdKafkaConfResT
err of
    RdKafkaConfOk -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    RdKafkaConfInvalid -> do
      String
str <- CCharBufPointer -> IO String
peekCString CCharBufPointer
charPtr
      KafkaError -> IO ()
forall a e. Exception e => e -> a
throw (KafkaError -> IO ()) -> KafkaError -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaInvalidConfigurationValue (String -> Text
Text.pack String
str)
    RdKafkaConfUnknown -> do
      String
str <- CCharBufPointer -> IO String
peekCString CCharBufPointer
charPtr
      KafkaError -> IO ()
forall a e. Exception e => e -> a
throw (KafkaError -> IO ()) -> KafkaError -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaUnknownConfigurationKey (String -> Text
Text.pack String
str)

setKafkaConfValue :: KafkaConf -> Text -> Text -> IO ()
setKafkaConfValue :: KafkaConf -> Text -> Text -> IO ()
setKafkaConfValue (KafkaConf confPtr :: RdKafkaConfTPtr
confPtr _ _) key :: Text
key value :: Text
value =
  Int -> (CCharBufPointer -> IO ()) -> IO ()
forall a b. Int -> (Ptr a -> IO b) -> IO b
allocaBytes Int
nErrorBytes ((CCharBufPointer -> IO ()) -> IO ())
-> (CCharBufPointer -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \charPtr :: CCharBufPointer
charPtr -> do
    RdKafkaConfResT
err <- RdKafkaConfTPtr
-> String
-> String
-> CCharBufPointer
-> CSize
-> IO RdKafkaConfResT
rdKafkaConfSet RdKafkaConfTPtr
confPtr (Text -> String
Text.unpack Text
key) (Text -> String
Text.unpack Text
value) CCharBufPointer
charPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nErrorBytes)
    RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue RdKafkaConfResT
err CCharBufPointer
charPtr

setAllKafkaConfValues :: KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues :: KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues conf :: KafkaConf
conf (KafkaProps props :: Map Text Text
props) = (Text -> Text -> IO ()) -> Map Text Text -> IO ()
forall m k a. Monoid m => (k -> a -> m) -> Map k a -> m
Map.foldMapWithKey (KafkaConf -> Text -> Text -> IO ()
setKafkaConfValue KafkaConf
conf) Map Text Text
props --forM_ props $ uncurry (setKafkaConfValue conf)

setTopicConfValue :: TopicConf -> Text -> Text -> IO ()
setTopicConfValue :: TopicConf -> Text -> Text -> IO ()
setTopicConfValue (TopicConf confPtr :: RdKafkaTopicConfTPtr
confPtr) key :: Text
key value :: Text
value =
  Int -> (CCharBufPointer -> IO ()) -> IO ()
forall a b. Int -> (Ptr a -> IO b) -> IO b
allocaBytes Int
nErrorBytes ((CCharBufPointer -> IO ()) -> IO ())
-> (CCharBufPointer -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \charPtr :: CCharBufPointer
charPtr -> do
    RdKafkaConfResT
err <- RdKafkaTopicConfTPtr
-> String
-> String
-> CCharBufPointer
-> CSize
-> IO RdKafkaConfResT
rdKafkaTopicConfSet RdKafkaTopicConfTPtr
confPtr (Text -> String
Text.unpack Text
key) (Text -> String
Text.unpack Text
value) CCharBufPointer
charPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nErrorBytes)
    RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue RdKafkaConfResT
err CCharBufPointer
charPtr

setAllTopicConfValues :: TopicConf -> TopicProps -> IO ()
setAllTopicConfValues :: TopicConf -> TopicProps -> IO ()
setAllTopicConfValues conf :: TopicConf
conf (TopicProps props :: Map Text Text
props) = (Text -> Text -> IO ()) -> Map Text Text -> IO ()
forall m k a. Monoid m => (k -> a -> m) -> Map k a -> m
Map.foldMapWithKey (TopicConf -> Text -> Text -> IO ()
setTopicConfValue TopicConf
conf) Map Text Text
props --forM_ props $ uncurry (setTopicConfValue conf)