{-# language DeriveGeneric    #-}
{-# language FlexibleContexts #-}
{-# language TypeFamilies     #-}
{-# language TypeOperators    #-}
{-|
Description : streams of Mu terms as Kafka producers

This module allows you to open a "sink" to Kafka.
Every value you sent to the sink will be sent over
to the corresponding Kafka instance.

This module is a wrapper over 'Kafka.Conduit.Sink'
from the (awesome) package @hw-kafka-client@.
-}
module Mu.Kafka.Producer (
  ProducerRecord'(..)
, kafkaSink
, kafkaSinkAutoClose
, kafkaSinkNoClose
, kafkaBatchSinkNoClose
, module X
) where

import           Conduit                      (mapC)
import           Control.Monad.IO.Class
import           Control.Monad.Trans.Resource
import qualified Data.Avro                    as A
import           Data.ByteString
import           Data.Conduit
import           Data.Typeable
import           GHC.Generics
import           Mu.Schema

import qualified Kafka.Conduit.Sink           as S
import           Kafka.Producer               (ProducerRecord (..))

import           Kafka.Conduit.Combinators    as X
import           Kafka.Consumer               as X (KafkaConsumer)
import           Kafka.Producer               as X (KafkaError, KafkaProducer, ProducePartition,
                                                    ProducerProperties, TopicName)

import           Mu.Kafka.Internal

data ProducerRecord' k v = ProducerRecord'
  { ProducerRecord' k v -> TopicName
prTopic     :: !TopicName
  , ProducerRecord' k v -> ProducePartition
prPartition :: !ProducePartition
  , ProducerRecord' k v -> Maybe k
prKey       :: Maybe k
  , ProducerRecord' k v -> Maybe v
prValue     :: Maybe v
  } deriving (ProducerRecord' k v -> ProducerRecord' k v -> Bool
(ProducerRecord' k v -> ProducerRecord' k v -> Bool)
-> (ProducerRecord' k v -> ProducerRecord' k v -> Bool)
-> Eq (ProducerRecord' k v)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall k v.
(Eq k, Eq v) =>
ProducerRecord' k v -> ProducerRecord' k v -> Bool
/= :: ProducerRecord' k v -> ProducerRecord' k v -> Bool
$c/= :: forall k v.
(Eq k, Eq v) =>
ProducerRecord' k v -> ProducerRecord' k v -> Bool
== :: ProducerRecord' k v -> ProducerRecord' k v -> Bool
$c== :: forall k v.
(Eq k, Eq v) =>
ProducerRecord' k v -> ProducerRecord' k v -> Bool
Eq, Int -> ProducerRecord' k v -> ShowS
[ProducerRecord' k v] -> ShowS
ProducerRecord' k v -> String
(Int -> ProducerRecord' k v -> ShowS)
-> (ProducerRecord' k v -> String)
-> ([ProducerRecord' k v] -> ShowS)
-> Show (ProducerRecord' k v)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall k v. (Show k, Show v) => Int -> ProducerRecord' k v -> ShowS
forall k v. (Show k, Show v) => [ProducerRecord' k v] -> ShowS
forall k v. (Show k, Show v) => ProducerRecord' k v -> String
showList :: [ProducerRecord' k v] -> ShowS
$cshowList :: forall k v. (Show k, Show v) => [ProducerRecord' k v] -> ShowS
show :: ProducerRecord' k v -> String
$cshow :: forall k v. (Show k, Show v) => ProducerRecord' k v -> String
showsPrec :: Int -> ProducerRecord' k v -> ShowS
$cshowsPrec :: forall k v. (Show k, Show v) => Int -> ProducerRecord' k v -> ShowS
Show, Typeable, (forall x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x)
-> (forall x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v)
-> Generic (ProducerRecord' k v)
forall x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v
forall x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall k v x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v
forall k v x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x
$cto :: forall k v x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v
$cfrom :: forall k v x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x
Generic)

toPR
  :: ( ToSchema sch sty t
     , A.ToAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR :: Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR proxy :: Proxy sch
proxy (ProducerRecord' t :: TopicName
t p :: ProducePartition
p k :: Maybe ByteString
k v :: Maybe t
v)
  = TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> ProducerRecord
ProducerRecord TopicName
t ProducePartition
p Maybe ByteString
k (Proxy sch -> t -> ByteString
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, HasAvroSchema (WithSchema sch sty t),
 ToAvro (WithSchema sch sty t)) =>
Proxy sch -> t -> ByteString
toBS Proxy sch
proxy (t -> ByteString) -> Maybe t -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe t
v)

-- | Creates a kafka producer for given properties and returns a Sink.
--
-- This method of creating a Sink represents a simple case
-- and does not provide access to `KafkaProducer`. For more complex scenarious
-- 'kafkaSinkAutoClose' or 'kafkaSinkNoClose' can be used.
kafkaSink
  :: ( MonadResource m
     , ToSchema sch sty t
     , A.ToAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch -> X.ProducerProperties
  -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSink :: Proxy sch
-> ProducerProperties
-> ConduitT
     (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSink proxy :: Proxy sch
proxy prod :: ProducerProperties
prod
  = (ProducerRecord' ByteString t -> ProducerRecord)
-> ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy) ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
-> ConduitT
     (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ProducerProperties
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
forall (m :: * -> *).
MonadResource m =>
ProducerProperties
-> ConduitT ProducerRecord Void m (Maybe KafkaError)
S.kafkaSink ProducerProperties
prod

-- | Creates a Sink for a given `KafkaProducer`.
-- The producer will be closed when the Sink is closed.
kafkaSinkAutoClose
  :: ( MonadResource m
     , ToSchema sch sty t
     , A.ToAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch -> KafkaProducer
  -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe X.KafkaError)
kafkaSinkAutoClose :: Proxy sch
-> KafkaProducer
-> ConduitT
     (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSinkAutoClose proxy :: Proxy sch
proxy prod :: KafkaProducer
prod
  = (ProducerRecord' ByteString t -> ProducerRecord)
-> ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy) ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
-> ConduitT
     (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| KafkaProducer -> ConduitM ProducerRecord Void m (Maybe KafkaError)
forall (m :: * -> *).
MonadResource m =>
KafkaProducer -> ConduitT ProducerRecord Void m (Maybe KafkaError)
S.kafkaSinkAutoClose KafkaProducer
prod

-- | Creates a Sink for a given `KafkaProducer`.
-- The producer will NOT be closed automatically.
kafkaSinkNoClose
  :: ( MonadIO m
     , ToSchema sch sty t
     , A.ToAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch -> KafkaProducer
  -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe X.KafkaError)
kafkaSinkNoClose :: Proxy sch
-> KafkaProducer
-> ConduitT
     (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSinkNoClose proxy :: Proxy sch
proxy prod :: KafkaProducer
prod
  = (ProducerRecord' ByteString t -> ProducerRecord)
-> ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy) ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
-> ConduitT
     (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| KafkaProducer -> ConduitM ProducerRecord Void m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaProducer -> ConduitT ProducerRecord Void m (Maybe KafkaError)
S.kafkaSinkNoClose KafkaProducer
prod

-- | Creates a batching Sink for a given `KafkaProducer`.
-- The producer will NOT be closed automatically.
kafkaBatchSinkNoClose
  :: ( MonadIO m
     , ToSchema sch sty t
     , A.ToAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch -> KafkaProducer
  -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)]
kafkaBatchSinkNoClose :: Proxy sch
-> KafkaProducer
-> ConduitT
     [ProducerRecord' ByteString t]
     Void
     m
     [(ProducerRecord, KafkaError)]
kafkaBatchSinkNoClose proxy :: Proxy sch
proxy prod :: KafkaProducer
prod
  = ([ProducerRecord' ByteString t] -> [ProducerRecord])
-> ConduitT [ProducerRecord' ByteString t] [ProducerRecord] m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ProducerRecord' ByteString t -> ProducerRecord)
-> [ProducerRecord' ByteString t] -> [ProducerRecord]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy)) ConduitT [ProducerRecord' ByteString t] [ProducerRecord] m ()
-> ConduitM [ProducerRecord] Void m [(ProducerRecord, KafkaError)]
-> ConduitT
     [ProducerRecord' ByteString t]
     Void
     m
     [(ProducerRecord, KafkaError)]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| KafkaProducer
-> ConduitM [ProducerRecord] Void m [(ProducerRecord, KafkaError)]
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ConduitT [ProducerRecord] Void m [(ProducerRecord, KafkaError)]
S.kafkaBatchSinkNoClose KafkaProducer
prod