hw-kafka-conduit-1.1.4: Conduit bindings for hw-kafka-client

Safe HaskellNone
LanguageHaskell2010

Kafka.Conduit.Source

Contents

Synopsis

Source

kafkaSource Source #

Creates a kafka producer for given properties and returns a Source.

This method of creating a Source represents a simple case and does not provide access to KafkaProducer. For more complex scenarious kafkaSinkNoClose or kafkaSinkAutoClose can be used.

kafkaSourceNoClose :: MonadIO m => KafkaConsumer -> Timeout -> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) Source #

Create a Source for a given KafkaConsumer. The consumer will NOT be closed automatically when the Source is closed.

kafkaSourceAutoClose :: MonadResource m => KafkaConsumer -> Timeout -> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) Source #

Create a Source for a given KafkaConsumer. The consumer will be closed automatically when the Source is closed.

Error handling

isFatal :: KafkaError -> Bool Source #

Checks if the error is fatal in a way that it doesn't make sense to retry after or is unsafe to ignore.

isPollTimeout :: KafkaError -> Bool Source #

Checks if the provided error is a timeout error (KafkaResponseError RdKafkaRespErrTimedOut).

Timeout errors are not fatal and occure, for example, in cases when pollMessage cannot return return a message after the specified poll timeout (no more messages in a topic).

Often this error can be ignored, however sometimes it can be useful to know that there was no more messages in a topic and "KafkaResponseError RdKafkaRespErrTimedOut" can be a good indicator of that.

isPartitionEOF :: KafkaError -> Bool Source #

Checks if the provided error is an indicator of reaching the end of a partition (KafkaResponseError RdKafkaRespErrPartitionEof).

PartitionEOF errors are not fatal and occure every time a consumer reaches the end of a partition.

Often this error can be ignored, however sometimes it can be useful to know that a partition has exhausted and "KafkaResponseError RdKafkaRespErrPartitionEof" can be a good indicator of that.

skipNonFatal :: Monad m => Conduit (Either KafkaError b) m (Either KafkaError b) Source #

Filters out non-fatal errors (see isFatal) and only allows fatal errors to be propagated downstream.

skipNonFatalExcept :: Monad m => [KafkaError -> Bool] -> Conduit (Either KafkaError b) m (Either KafkaError b) Source #

Filters out non-fatal errors and provides the ability to control which non-fatal errors will still be propagated downstream.

Example: > skipNonFatalExcept [isPollTimeout, isPartitionEOF] The instruction above skips all the non-fatal errors except for "KafkaResponseError RdKafkaRespErrTimedOut" and "KafkaResponseError RdKafkaRespErrPartitionEof".

This function does not allow filtering out fatal errors.

Utility combinators

mapFirst :: (Bifunctor t, Monad m) => (k -> k') -> Conduit (t k v) m (t k' v) Source #

Maps over the first element of a value

mapFirst f = L.map (first f)

mapValue :: (Functor t, Monad m) => (v -> v') -> Conduit (t v) m (t v') Source #

Maps over a value

mapValue f = L.map (fmap f)

bimapValue :: (Bifunctor t, Monad m) => (k -> k') -> (v -> v') -> Conduit (t k v) m (t k' v') Source #

Bimaps (maps over both the first and the second element) over a value

bimapValue f g = L.map (bimap f g)

sequenceValueFirst :: (Bitraversable t, Applicative f, Monad m) => Conduit (t (f k) v) m (f (t k v)) Source #

Sequences the first element of a value

sequenceValueFirst = L.map sequenceFirst

sequenceValue :: (Traversable t, Applicative f, Monad m) => Conduit (t (f v)) m (f (t v)) Source #

Sequences the value

sequenceValue = L.map sequenceA

bisequenceValue :: (Bitraversable t, Applicative f, Monad m) => Conduit (t (f k) (f v)) m (f (t k v)) Source #

Sequences both the first and the second element of a value (bisequences the value)

bisequenceValue = L.map bisequenceA

traverseValueFirst :: (Bitraversable t, Applicative f, Monad m) => (k -> f k') -> Conduit (t k v) m (f (t k' v)) Source #

Traverses over the first element of a value

traverseValueFirst f = L.map (traverseFirst f)

traverseValue :: (Traversable t, Applicative f, Monad m) => (v -> f v') -> Conduit (t v) m (f (t v')) Source #

Traverses over the value

L.map (traverse f)

bitraverseValue :: (Bitraversable t, Applicative f, Monad m) => (k -> f k') -> (v -> f v') -> Conduit (t k v) m (f (t k' v')) Source #

Traverses over both the first and the second elements of a value (bitraverses over a value)

bitraverseValue f g = L.map (bitraverse f g)

traverseValueFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> Conduit (t k v) m (f (t k' v)) Source #

Monadically traverses over the first element of a value

traverseValueFirstM f = L.mapM (traverseFirstM f)

traverseValueM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> Conduit (t v) m (f (t v')) Source #

Monadically traverses over a value

traverseValueM f = L.mapM (traverseM f)

bitraverseValueM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> Conduit (t k v) m (f (t k' v')) Source #

Monadically traverses over both the first and the second elements of a value (monadically bitraverses over a value)

bitraverseValueM f g = L.mapM (bitraverseM f g)