| Safe Haskell | None | 
|---|---|
| Language | Haskell2010 | 
Mu.Kafka.Producer
Description
This module allows you to open a "sink" to Kafka. Every value you sent to the sink will be sent over to the corresponding Kafka instance.
This module is a wrapper over Sink
from the (awesome) package hw-kafka-client.
Synopsis
- data ProducerRecord' k v = ProducerRecord' {
- prTopic :: !TopicName
 - prPartition :: !ProducePartition
 - prKey :: Maybe k
 - prValue :: Maybe v
 
 - kafkaSink :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> ProducerProperties -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
 - kafkaSinkAutoClose :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
 - kafkaSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
 - kafkaBatchSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)]
 - data ProducerProperties
 - data KafkaProducer
 - data ProducePartition
 - data KafkaConsumer
 - newtype BatchSize = BatchSize {
- unBatchSize :: Int
 
 - data TopicName
 - data KafkaError
 - batchByOrFlushEither :: forall (m :: Type -> Type) e a. Monad m => BatchSize -> ConduitT (Either e a) [a] m ()
 - batchByOrFlush :: forall (m :: Type -> Type) a. Monad m => BatchSize -> ConduitT (Maybe a) [a] m ()
 - foldYield :: forall (m :: Type -> Type) i s o. Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> ConduitT i o m ()
 - throwLeftSatisfy :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => (e -> Bool) -> ConduitT (Either e i) (Either e i) m ()
 - throwLeft :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => ConduitT (Either e i) i m ()
 
Documentation
data ProducerRecord' k v Source #
Constructors
| ProducerRecord' | |
Fields 
  | |
Instances
kafkaSink :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> ProducerProperties -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a kafka producer for given properties and returns a Sink.
This method of creating a Sink represents a simple case
 and does not provide access to KafkaProducer. For more complex scenarious
 kafkaSinkAutoClose or kafkaSinkNoClose can be used.
kafkaSinkAutoClose :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a Sink for a given KafkaProducer.
 The producer will be closed when the Sink is closed.
kafkaSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a Sink for a given KafkaProducer.
 The producer will NOT be closed automatically.
kafkaBatchSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)] Source #
Creates a batching Sink for a given KafkaProducer.
 The producer will NOT be closed automatically.
data ProducerProperties #
Properties to create KafkaProducer.
Instances
| Semigroup ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties Methods (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties #  | |
| Monoid ProducerProperties | Right biased so we prefer newer properties over older ones.  | 
Defined in Kafka.Producer.ProducerProperties Methods mempty :: ProducerProperties # mappend :: ProducerProperties -> ProducerProperties -> ProducerProperties # mconcat :: [ProducerProperties] -> ProducerProperties #  | |
data KafkaProducer #
Main pointer to Kafka object, which contains our brokers
Instances
| HasKafka KafkaProducer | |
Defined in Kafka.Producer.Types Methods getKafka :: KafkaProducer -> Kafka  | |
| HasKafkaConf KafkaProducer | |
Defined in Kafka.Producer.Types Methods getKafkaConf :: KafkaProducer -> KafkaConf  | |
| HasTopicConf KafkaProducer | |
Defined in Kafka.Producer.Types Methods getTopicConf :: KafkaProducer -> TopicConf  | |
data ProducePartition #
Instances
data KafkaConsumer #
Instances
| HasKafka KafkaConsumer | |
Defined in Kafka.Consumer.Types Methods getKafka :: KafkaConsumer -> Kafka  | |
| HasKafkaConf KafkaConsumer | |
Defined in Kafka.Consumer.Types Methods getKafkaConf :: KafkaConsumer -> KafkaConf  | |
Constructors
| BatchSize | |
Fields 
  | |
Instances
| Eq BatchSize | |
| Num BatchSize | |
| Ord BatchSize | |
| Read BatchSize | |
| Show BatchSize | |
| Generic BatchSize | |
| type Rep BatchSize | |
Defined in Kafka.Types  | |
Topic name to be consumed
Wildcard (regex) topics are supported by the librdkafka assignor:
 any topic name in the topics list that is prefixed with ^ will
 be regex-matched to the full list of topics in the cluster and matching
 topics will be added to the subscription list.
Instances
| Eq TopicName | |
| Ord TopicName | |
| Read TopicName | |
| Show TopicName | |
| Generic TopicName | |
| type Rep TopicName | |
Defined in Kafka.Types  | |
data KafkaError #
Any Kafka errors
Instances
batchByOrFlushEither :: forall (m :: Type -> Type) e a. Monad m => BatchSize -> ConduitT (Either e a) [a] m () #
batchByOrFlush :: forall (m :: Type -> Type) a. Monad m => BatchSize -> ConduitT (Maybe a) [a] m () #
foldYield :: forall (m :: Type -> Type) i s o. Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> ConduitT i o m () #
Create a conduit that folds with the function f over its input i with its internal state s and emits outputs [o], then finally emits outputs [o] from the function g applied to the final state s.
throwLeftSatisfy :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => (e -> Bool) -> ConduitT (Either e i) (Either e i) m () #
Throws the left part of a value in a MonadThrow context if the value
 satisfies the predicate
throwLeft :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => ConduitT (Either e i) i m () #
Throws the left part of a value in a MonadThrow context