Safe Haskell | None |
---|---|
Language | Haskell2010 |
This module allows you to open a "sink" to Kafka. Every value you sent to the sink will be sent over to the corresponding Kafka instance.
This module is a wrapper over Sink
from the (awesome) package hw-kafka-client
.
Synopsis
- data ProducerRecord' k v = ProducerRecord' {
- prTopic :: !TopicName
- prPartition :: !ProducePartition
- prKey :: Maybe k
- prValue :: Maybe v
- kafkaSink :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> ProducerProperties -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
- kafkaSinkAutoClose :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
- kafkaSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError)
- kafkaBatchSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)]
- data ProducerProperties
- data KafkaProducer
- data ProducePartition
- data KafkaConsumer
- newtype BatchSize = BatchSize {
- unBatchSize :: Int
- data TopicName
- data KafkaError
- batchByOrFlushEither :: forall (m :: Type -> Type) e a. Monad m => BatchSize -> ConduitT (Either e a) [a] m ()
- batchByOrFlush :: forall (m :: Type -> Type) a. Monad m => BatchSize -> ConduitT (Maybe a) [a] m ()
- foldYield :: forall (m :: Type -> Type) i s o. Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> ConduitT i o m ()
- throwLeftSatisfy :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => (e -> Bool) -> ConduitT (Either e i) (Either e i) m ()
- throwLeft :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => ConduitT (Either e i) i m ()
Documentation
data ProducerRecord' k v Source #
ProducerRecord' | |
|
Instances
kafkaSink :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> ProducerProperties -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a kafka producer for given properties and returns a Sink.
This method of creating a Sink represents a simple case
and does not provide access to KafkaProducer
. For more complex scenarious
kafkaSinkAutoClose
or kafkaSinkNoClose
can be used.
kafkaSinkAutoClose :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a Sink for a given KafkaProducer
.
The producer will be closed when the Sink is closed.
kafkaSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #
Creates a Sink for a given KafkaProducer
.
The producer will NOT be closed automatically.
kafkaBatchSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)] Source #
Creates a batching Sink for a given KafkaProducer
.
The producer will NOT be closed automatically.
data ProducerProperties #
Properties to create KafkaProducer
.
Instances
Semigroup ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties # | |
Monoid ProducerProperties | Right biased so we prefer newer properties over older ones. |
Defined in Kafka.Producer.ProducerProperties |
data KafkaProducer #
Main pointer to Kafka object, which contains our brokers
Instances
HasKafka KafkaProducer | |
Defined in Kafka.Producer.Types getKafka :: KafkaProducer -> Kafka | |
HasKafkaConf KafkaProducer | |
Defined in Kafka.Producer.Types getKafkaConf :: KafkaProducer -> KafkaConf | |
HasTopicConf KafkaProducer | |
Defined in Kafka.Producer.Types getTopicConf :: KafkaProducer -> TopicConf |
data ProducePartition #
Instances
data KafkaConsumer #
Instances
HasKafka KafkaConsumer | |
Defined in Kafka.Consumer.Types getKafka :: KafkaConsumer -> Kafka | |
HasKafkaConf KafkaConsumer | |
Defined in Kafka.Consumer.Types getKafkaConf :: KafkaConsumer -> KafkaConf |
Topic name to be consumed
Wildcard (regex) topics are supported by the librdkafka assignor:
any topic name in the topics list that is prefixed with ^
will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.
data KafkaError #
Any Kafka errors
Instances
batchByOrFlushEither :: forall (m :: Type -> Type) e a. Monad m => BatchSize -> ConduitT (Either e a) [a] m () #
batchByOrFlush :: forall (m :: Type -> Type) a. Monad m => BatchSize -> ConduitT (Maybe a) [a] m () #
foldYield :: forall (m :: Type -> Type) i s o. Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> ConduitT i o m () #
Create a conduit that folds with the function f over its input i with its internal state s and emits outputs [o], then finally emits outputs [o] from the function g applied to the final state s.
throwLeftSatisfy :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => (e -> Bool) -> ConduitT (Either e i) (Either e i) m () #
Throws the left part of a value in a MonadThrow
context if the value
satisfies the predicate
throwLeft :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => ConduitT (Either e i) i m () #
Throws the left part of a value in a MonadThrow
context