{-# language DeriveGeneric #-}
{-# language FlexibleContexts #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
module Mu.Kafka.Producer (
ProducerRecord'(..)
, kafkaSink
, kafkaSinkAutoClose
, kafkaSinkNoClose
, kafkaBatchSinkNoClose
, module X
) where
import Conduit (mapC)
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource
import qualified Data.Avro as A
import Data.ByteString
import Data.Conduit
import Data.Typeable
import GHC.Generics
import Mu.Schema
import qualified Kafka.Conduit.Sink as S
import Kafka.Producer (ProducerRecord (..))
import Kafka.Conduit.Combinators as X
import Kafka.Consumer as X (KafkaConsumer)
import Kafka.Producer as X (KafkaError, KafkaProducer, ProducePartition,
ProducerProperties, TopicName)
import Mu.Kafka.Internal
data ProducerRecord' k v = ProducerRecord'
{ ProducerRecord' k v -> TopicName
prTopic :: !TopicName
, ProducerRecord' k v -> ProducePartition
prPartition :: !ProducePartition
, ProducerRecord' k v -> Maybe k
prKey :: Maybe k
, ProducerRecord' k v -> Maybe v
prValue :: Maybe v
} deriving (ProducerRecord' k v -> ProducerRecord' k v -> Bool
(ProducerRecord' k v -> ProducerRecord' k v -> Bool)
-> (ProducerRecord' k v -> ProducerRecord' k v -> Bool)
-> Eq (ProducerRecord' k v)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall k v.
(Eq k, Eq v) =>
ProducerRecord' k v -> ProducerRecord' k v -> Bool
/= :: ProducerRecord' k v -> ProducerRecord' k v -> Bool
$c/= :: forall k v.
(Eq k, Eq v) =>
ProducerRecord' k v -> ProducerRecord' k v -> Bool
== :: ProducerRecord' k v -> ProducerRecord' k v -> Bool
$c== :: forall k v.
(Eq k, Eq v) =>
ProducerRecord' k v -> ProducerRecord' k v -> Bool
Eq, Int -> ProducerRecord' k v -> ShowS
[ProducerRecord' k v] -> ShowS
ProducerRecord' k v -> String
(Int -> ProducerRecord' k v -> ShowS)
-> (ProducerRecord' k v -> String)
-> ([ProducerRecord' k v] -> ShowS)
-> Show (ProducerRecord' k v)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall k v. (Show k, Show v) => Int -> ProducerRecord' k v -> ShowS
forall k v. (Show k, Show v) => [ProducerRecord' k v] -> ShowS
forall k v. (Show k, Show v) => ProducerRecord' k v -> String
showList :: [ProducerRecord' k v] -> ShowS
$cshowList :: forall k v. (Show k, Show v) => [ProducerRecord' k v] -> ShowS
show :: ProducerRecord' k v -> String
$cshow :: forall k v. (Show k, Show v) => ProducerRecord' k v -> String
showsPrec :: Int -> ProducerRecord' k v -> ShowS
$cshowsPrec :: forall k v. (Show k, Show v) => Int -> ProducerRecord' k v -> ShowS
Show, Typeable, (forall x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x)
-> (forall x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v)
-> Generic (ProducerRecord' k v)
forall x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v
forall x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall k v x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v
forall k v x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x
$cto :: forall k v x. Rep (ProducerRecord' k v) x -> ProducerRecord' k v
$cfrom :: forall k v x. ProducerRecord' k v -> Rep (ProducerRecord' k v) x
Generic)
toPR
:: ( ToSchema sch sty t
, A.ToAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR :: Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR proxy :: Proxy sch
proxy (ProducerRecord' t :: TopicName
t p :: ProducePartition
p k :: Maybe ByteString
k v :: Maybe t
v)
= TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> ProducerRecord
ProducerRecord TopicName
t ProducePartition
p Maybe ByteString
k (Proxy sch -> t -> ByteString
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, HasAvroSchema (WithSchema sch sty t),
ToAvro (WithSchema sch sty t)) =>
Proxy sch -> t -> ByteString
toBS Proxy sch
proxy (t -> ByteString) -> Maybe t -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe t
v)
kafkaSink
:: ( MonadResource m
, ToSchema sch sty t
, A.ToAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch -> X.ProducerProperties
-> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSink :: Proxy sch
-> ProducerProperties
-> ConduitT
(ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSink proxy :: Proxy sch
proxy prod :: ProducerProperties
prod
= (ProducerRecord' ByteString t -> ProducerRecord)
-> ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy) ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
-> ConduitT
(ProducerRecord' ByteString t) Void m (Maybe KafkaError)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ProducerProperties
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
forall (m :: * -> *).
MonadResource m =>
ProducerProperties
-> ConduitT ProducerRecord Void m (Maybe KafkaError)
S.kafkaSink ProducerProperties
prod
kafkaSinkAutoClose
:: ( MonadResource m
, ToSchema sch sty t
, A.ToAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch -> KafkaProducer
-> ConduitT (ProducerRecord' ByteString t) Void m (Maybe X.KafkaError)
kafkaSinkAutoClose :: Proxy sch
-> KafkaProducer
-> ConduitT
(ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSinkAutoClose proxy :: Proxy sch
proxy prod :: KafkaProducer
prod
= (ProducerRecord' ByteString t -> ProducerRecord)
-> ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy) ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
-> ConduitT
(ProducerRecord' ByteString t) Void m (Maybe KafkaError)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| KafkaProducer -> ConduitM ProducerRecord Void m (Maybe KafkaError)
forall (m :: * -> *).
MonadResource m =>
KafkaProducer -> ConduitT ProducerRecord Void m (Maybe KafkaError)
S.kafkaSinkAutoClose KafkaProducer
prod
kafkaSinkNoClose
:: ( MonadIO m
, ToSchema sch sty t
, A.ToAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch -> KafkaProducer
-> ConduitT (ProducerRecord' ByteString t) Void m (Maybe X.KafkaError)
kafkaSinkNoClose :: Proxy sch
-> KafkaProducer
-> ConduitT
(ProducerRecord' ByteString t) Void m (Maybe KafkaError)
kafkaSinkNoClose proxy :: Proxy sch
proxy prod :: KafkaProducer
prod
= (ProducerRecord' ByteString t -> ProducerRecord)
-> ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy) ConduitT (ProducerRecord' ByteString t) ProducerRecord m ()
-> ConduitM ProducerRecord Void m (Maybe KafkaError)
-> ConduitT
(ProducerRecord' ByteString t) Void m (Maybe KafkaError)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| KafkaProducer -> ConduitM ProducerRecord Void m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaProducer -> ConduitT ProducerRecord Void m (Maybe KafkaError)
S.kafkaSinkNoClose KafkaProducer
prod
kafkaBatchSinkNoClose
:: ( MonadIO m
, ToSchema sch sty t
, A.ToAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch -> KafkaProducer
-> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)]
kafkaBatchSinkNoClose :: Proxy sch
-> KafkaProducer
-> ConduitT
[ProducerRecord' ByteString t]
Void
m
[(ProducerRecord, KafkaError)]
kafkaBatchSinkNoClose proxy :: Proxy sch
proxy prod :: KafkaProducer
prod
= ([ProducerRecord' ByteString t] -> [ProducerRecord])
-> ConduitT [ProducerRecord' ByteString t] [ProducerRecord] m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ProducerRecord' ByteString t -> ProducerRecord)
-> [ProducerRecord' ByteString t] -> [ProducerRecord]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
forall (sch :: Schema * *) sty t.
(ToSchema sch sty t, ToAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ProducerRecord' ByteString t -> ProducerRecord
toPR Proxy sch
proxy)) ConduitT [ProducerRecord' ByteString t] [ProducerRecord] m ()
-> ConduitM [ProducerRecord] Void m [(ProducerRecord, KafkaError)]
-> ConduitT
[ProducerRecord' ByteString t]
Void
m
[(ProducerRecord, KafkaError)]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| KafkaProducer
-> ConduitM [ProducerRecord] Void m [(ProducerRecord, KafkaError)]
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ConduitT [ProducerRecord] Void m [(ProducerRecord, KafkaError)]
S.kafkaBatchSinkNoClose KafkaProducer
prod