mu-kafka-0.3.0.0: Utilities for interoperation between Mu and Kafka
Safe HaskellNone
LanguageHaskell2010

Mu.Kafka.Producer

Description

This module allows you to open a "sink" to Kafka. Every value you sent to the sink will be sent over to the corresponding Kafka instance.

This module is a wrapper over Sink from the (awesome) package hw-kafka-client.

Synopsis

Documentation

data ProducerRecord' k v Source #

Instances

Instances details
(Eq k, Eq v) => Eq (ProducerRecord' k v) Source # 
Instance details

Defined in Mu.Kafka.Producer

(Show k, Show v) => Show (ProducerRecord' k v) Source # 
Instance details

Defined in Mu.Kafka.Producer

Generic (ProducerRecord' k v) Source # 
Instance details

Defined in Mu.Kafka.Producer

Associated Types

type Rep (ProducerRecord' k v) :: Type -> Type #

Methods

from :: ProducerRecord' k v -> Rep (ProducerRecord' k v) x #

to :: Rep (ProducerRecord' k v) x -> ProducerRecord' k v #

type Rep (ProducerRecord' k v) Source # 
Instance details

Defined in Mu.Kafka.Producer

type Rep (ProducerRecord' k v) = D1 ('MetaData "ProducerRecord'" "Mu.Kafka.Producer" "mu-kafka-0.3.0.0-inplace" 'False) (C1 ('MetaCons "ProducerRecord'" 'PrefixI 'True) ((S1 ('MetaSel ('Just "prTopic") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 TopicName) :*: S1 ('MetaSel ('Just "prPartition") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 ProducePartition)) :*: (S1 ('MetaSel ('Just "prKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe k)) :*: S1 ('MetaSel ('Just "prValue") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe v)))))

kafkaSink :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> ProducerProperties -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #

Creates a kafka producer for given properties and returns a Sink.

This method of creating a Sink represents a simple case and does not provide access to KafkaProducer. For more complex scenarious kafkaSinkAutoClose or kafkaSinkNoClose can be used.

kafkaSinkAutoClose :: (MonadResource m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #

Creates a Sink for a given KafkaProducer. The producer will be closed when the Sink is closed.

kafkaSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT (ProducerRecord' ByteString t) Void m (Maybe KafkaError) Source #

Creates a Sink for a given KafkaProducer. The producer will NOT be closed automatically.

kafkaBatchSinkNoClose :: (MonadIO m, ToSchema sch sty t, ToAvro (WithSchema sch sty t), HasAvroSchema (WithSchema sch sty t)) => Proxy sch -> KafkaProducer -> ConduitT [ProducerRecord' ByteString t] Void m [(ProducerRecord, KafkaError)] Source #

Creates a batching Sink for a given KafkaProducer. The producer will NOT be closed automatically.

data KafkaProducer #

Main pointer to Kafka object, which contains our brokers

Instances

Instances details
HasKafka KafkaProducer 
Instance details

Defined in Kafka.Producer.Types

Methods

getKafka :: KafkaProducer -> Kafka

HasKafkaConf KafkaProducer 
Instance details

Defined in Kafka.Producer.Types

Methods

getKafkaConf :: KafkaProducer -> KafkaConf

HasTopicConf KafkaProducer 
Instance details

Defined in Kafka.Producer.Types

Methods

getTopicConf :: KafkaProducer -> TopicConf

data ProducePartition #

Instances

Instances details
Eq ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Ord ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Show ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Generic ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep ProducePartition :: Type -> Type #

type Rep ProducePartition 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducePartition = D1 ('MetaData "ProducePartition" "Kafka.Producer.Types" "hw-kfk-clnt-3.1.0-ba432bdb" 'False) (C1 ('MetaCons "SpecifiedPartition" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'SourceUnpack 'SourceStrict 'DecidedUnpack) (Rec0 Int)) :+: C1 ('MetaCons "UnassignedPartition" 'PrefixI 'False) (U1 :: Type -> Type))

data KafkaConsumer #

Instances

Instances details
HasKafka KafkaConsumer 
Instance details

Defined in Kafka.Consumer.Types

Methods

getKafka :: KafkaConsumer -> Kafka

HasKafkaConf KafkaConsumer 
Instance details

Defined in Kafka.Consumer.Types

Methods

getKafkaConf :: KafkaConsumer -> KafkaConf

newtype BatchSize #

Constructors

BatchSize 

Fields

Instances

Instances details
Eq BatchSize 
Instance details

Defined in Kafka.Types

Num BatchSize 
Instance details

Defined in Kafka.Types

Ord BatchSize 
Instance details

Defined in Kafka.Types

Read BatchSize 
Instance details

Defined in Kafka.Types

Show BatchSize 
Instance details

Defined in Kafka.Types

Generic BatchSize 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BatchSize :: Type -> Type #

type Rep BatchSize 
Instance details

Defined in Kafka.Types

type Rep BatchSize = D1 ('MetaData "BatchSize" "Kafka.Types" "hw-kfk-clnt-3.1.0-ba432bdb" 'True) (C1 ('MetaCons "BatchSize" 'PrefixI 'True) (S1 ('MetaSel ('Just "unBatchSize") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))

data TopicName #

Topic name to be consumed

Wildcard (regex) topics are supported by the librdkafka assignor: any topic name in the topics list that is prefixed with ^ will be regex-matched to the full list of topics in the cluster and matching topics will be added to the subscription list.

Instances

Instances details
Eq TopicName 
Instance details

Defined in Kafka.Types

Ord TopicName 
Instance details

Defined in Kafka.Types

Read TopicName 
Instance details

Defined in Kafka.Types

Show TopicName 
Instance details

Defined in Kafka.Types

Generic TopicName 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicName :: Type -> Type #

type Rep TopicName 
Instance details

Defined in Kafka.Types

type Rep TopicName = D1 ('MetaData "TopicName" "Kafka.Types" "hw-kfk-clnt-3.1.0-ba432bdb" 'True) (C1 ('MetaCons "TopicName" 'PrefixI 'True) (S1 ('MetaSel ('Just "unTopicName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

data KafkaError #

Any Kafka errors

Instances

Instances details
Eq KafkaError 
Instance details

Defined in Kafka.Types

Show KafkaError 
Instance details

Defined in Kafka.Types

Generic KafkaError 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaError :: Type -> Type #

Exception KafkaError 
Instance details

Defined in Kafka.Types

type Rep KafkaError 
Instance details

Defined in Kafka.Types

type Rep KafkaError = D1 ('MetaData "KafkaError" "Kafka.Types" "hw-kfk-clnt-3.1.0-ba432bdb" 'False) ((C1 ('MetaCons "KafkaError" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: (C1 ('MetaCons "KafkaInvalidReturnValue" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "KafkaBadSpecification" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))) :+: ((C1 ('MetaCons "KafkaResponseError" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 RdKafkaRespErrT)) :+: C1 ('MetaCons "KafkaInvalidConfigurationValue" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text))) :+: (C1 ('MetaCons "KafkaUnknownConfigurationKey" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "KafkaBadConfiguration" 'PrefixI 'False) (U1 :: Type -> Type))))

batchByOrFlushEither :: forall (m :: Type -> Type) e a. Monad m => BatchSize -> ConduitT (Either e a) [a] m () #

batchByOrFlush :: forall (m :: Type -> Type) a. Monad m => BatchSize -> ConduitT (Maybe a) [a] m () #

foldYield :: forall (m :: Type -> Type) i s o. Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> ConduitT i o m () #

Create a conduit that folds with the function f over its input i with its internal state s and emits outputs [o], then finally emits outputs [o] from the function g applied to the final state s.

throwLeftSatisfy :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => (e -> Bool) -> ConduitT (Either e i) (Either e i) m () #

Throws the left part of a value in a MonadThrow context if the value satisfies the predicate

throwLeft :: forall (m :: Type -> Type) e i. (MonadThrow m, Exception e) => ConduitT (Either e i) i m () #

Throws the left part of a value in a MonadThrow context