| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Mu.Kafka.Producer
Description
This module allows you to open a "sink" to Kafka. Every value you sent to the sink will be sent over to the corresponding Kafka instance.
This module is a wrapper over Sink
from the (awesome) package hw-kafka-client.
Synopsis
- data ProducerRecord' k v = ProducerRecord' {
- prTopic :: !TopicName
- prPartition :: !ProducePartition
- prKey :: Maybe k
- prValue :: Maybe v
- kafkaSink :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> ProducerProperties -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
- kafkaSinkAutoClose :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
- kafkaSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
- kafkaBatchSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)]
- data ProducerProperties
- data KafkaProducer
- data ProducePartition
- data KafkaConsumer
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- data TopicName
- data KafkaError
- batchByOrFlushEither :: forall (m :: Type -> Type) e a. Monad m => BatchSize -> ConduitT (Either e a) [a] m ()
- batchByOrFlush :: forall (m :: Type -> Type) a. Monad m => BatchSize -> ConduitT (Maybe a) [a] m ()
- foldYield :: forall (m :: Type -> Type) i s o. Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> ConduitT i o m ()
- throwLeftSatisfy :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => (e -> Bool) -> ConduitT (Either e i) (Either e i) m ()
- throwLeft :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => ConduitT (Either e i) i m ()
Documentation
data ProducerRecord' k v Source #
Constructors
| ProducerRecord' | |
Fields
| |
Instances
kafkaSink :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> ProducerProperties -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a kafka producer for given properties and returns a Sink.
This method of creating a Sink represents a simple case
and does not provide access to KafkaProducer. For more complex scenarious
kafkaSinkAutoClose or kafkaSinkNoClose can be used.
kafkaSinkAutoClose :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a Sink for a given KafkaProducer.
The producer will be closed when the Sink is closed.
kafkaSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a Sink for a given KafkaProducer.
The producer will NOT be closed automatically.
kafkaBatchSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)] Source #
Creates a batching Sink for a given KafkaProducer.
The producer will NOT be closed automatically.
data ProducerProperties #
Properties to create KafkaProducer.
Instances
| Semigroup ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties Methods (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties # | |
| Monoid ProducerProperties | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Producer.ProducerProperties Methods mempty :: ProducerProperties # mappend :: ProducerProperties -> ProducerProperties -> ProducerProperties # mconcat :: [ProducerProperties] -> ProducerProperties # | |
data KafkaProducer #
Main pointer to Kafka object, which contains our brokers
Instances
| HasKafka KafkaProducer | |
Defined in Kafka.Producer.Types Methods getKafka :: KafkaProducer -> Kafka | |
| HasKafkaConf KafkaProducer | |
Defined in Kafka.Producer.Types Methods getKafkaConf :: KafkaProducer -> KafkaConf | |
| HasTopicConf KafkaProducer | |
Defined in Kafka.Producer.Types Methods getTopicConf :: KafkaProducer -> TopicConf | |
data ProducePartition #
Instances
data KafkaConsumer #
Instances
| HasKafka KafkaConsumer | |
Defined in Kafka.Consumer.Types Methods getKafka :: KafkaConsumer -> Kafka | |
| HasKafkaConf KafkaConsumer | |
Defined in Kafka.Consumer.Types Methods getKafkaConf :: KafkaConsumer -> KafkaConf | |
Constructors
| BatchSize | |
Fields
| |
Instances
| Eq BatchSize | |
| Num BatchSize | |
| Ord BatchSize | |
| Read BatchSize | |
| Show BatchSize | |
| Generic BatchSize | |
| type Rep BatchSize | |
Defined in Kafka.Types | |
Topic name to be consumed
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^ will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
Instances
| Eq TopicName | |
| Ord TopicName | |
| Read TopicName | |
| Show TopicName | |
| Generic TopicName | |
| type Rep TopicName | |
Defined in Kafka.Types | |
data KafkaError #
Any Kafka errors
Instances
batchByOrFlushEither :: forall (m :: Type -> Type) e a. Monad m => BatchSize -> ConduitT (Either e a) [a] m () #
batchByOrFlush :: forall (m :: Type -> Type) a. Monad m => BatchSize -> ConduitT (Maybe a) [a] m () #
foldYield :: forall (m :: Type -> Type) i s o. Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> ConduitT i o m () #
Create a conduit that folds with the function f over its input i with its internal state s and emits outputs [o], then finally emits outputs [o] from the function g applied to the final state s.
throwLeftSatisfy :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => (e -> Bool) -> ConduitT (Either e i) (Either e i) m () #
Throws the left part of a value in a MonadThrow context if the value
satisfies the predicate
throwLeft :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => ConduitT (Either e i) i m () #
Throws the left part of a value in a MonadThrow context