Safe Haskell | None |
---|---|
Language | Haskell2010 |
- kafkaSource :: MonadResource m => ConsumerProperties -> Subscription -> Timeout -> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- kafkaSourceNoClose :: MonadIO m => KafkaConsumer -> Timeout -> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- kafkaSourceAutoClose :: MonadResource m => KafkaConsumer -> Timeout -> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
- isFatal :: KafkaError -> Bool
- isPollTimeout :: KafkaError -> Bool
- isPartitionEOF :: KafkaError -> Bool
- skipNonFatal :: Monad m => Conduit (Either KafkaError b) m (Either KafkaError b)
- skipNonFatalExcept :: Monad m => [KafkaError -> Bool] -> Conduit (Either KafkaError b) m (Either KafkaError b)
- mapFirst :: (Bifunctor t, Monad m) => (k -> k') -> Conduit (t k v) m (t k' v)
- mapValue :: (Functor t, Monad m) => (v -> v') -> Conduit (t v) m (t v')
- bimapValue :: (Bifunctor t, Monad m) => (k -> k') -> (v -> v') -> Conduit (t k v) m (t k' v')
- sequenceValueFirst :: (Bitraversable t, Applicative f, Monad m) => Conduit (t (f k) v) m (f (t k v))
- sequenceValue :: (Traversable t, Applicative f, Monad m) => Conduit (t (f v)) m (f (t v))
- bisequenceValue :: (Bitraversable t, Applicative f, Monad m) => Conduit (t (f k) (f v)) m (f (t k v))
- traverseValueFirst :: (Bitraversable t, Applicative f, Monad m) => (k -> f k') -> Conduit (t k v) m (f (t k' v))
- traverseValue :: (Traversable t, Applicative f, Monad m) => (v -> f v') -> Conduit (t v) m (f (t v'))
- bitraverseValue :: (Bitraversable t, Applicative f, Monad m) => (k -> f k') -> (v -> f v') -> Conduit (t k v) m (f (t k' v'))
- traverseValueFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> Conduit (t k v) m (f (t k' v))
- traverseValueM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> Conduit (t v) m (f (t v'))
- bitraverseValueM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> Conduit (t k v) m (f (t k' v'))
Source
:: MonadResource m | |
=> ConsumerProperties | |
-> Subscription | |
-> Timeout | Poll timeout |
-> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) |
kafkaSourceNoClose :: MonadIO m => KafkaConsumer -> Timeout -> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) Source #
Create a Source
for a given KafkaConsumer
.
The consumer will NOT be closed automatically when the Source
is closed.
kafkaSourceAutoClose :: MonadResource m => KafkaConsumer -> Timeout -> Source m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) Source #
Create a Source
for a given KafkaConsumer
.
The consumer will be closed automatically when the Source
is closed.
Error handling
isFatal :: KafkaError -> Bool Source #
Checks if the error is fatal in a way that it doesn't make sense to retry after or is unsafe to ignore.
isPollTimeout :: KafkaError -> Bool Source #
Checks if the provided error is a timeout error (KafkaResponseError
RdKafkaRespErrTimedOut
).
Timeout errors are not fatal and occure, for example, in cases when pollMessage
cannot return return a message after the specified poll timeout (no more messages in a topic).
Often this error can be ignored, however sometimes it can be useful to know that
there was no more messages in a topic and "KafkaResponseError
RdKafkaRespErrTimedOut
"
can be a good indicator of that.
isPartitionEOF :: KafkaError -> Bool Source #
Checks if the provided error is an indicator of reaching the end of a partition
(KafkaResponseError
RdKafkaRespErrPartitionEof
).
PartitionEOF
errors are not fatal and occure every time a consumer reaches the end
of a partition.
Often this error can be ignored, however sometimes it can be useful to know that
a partition has exhausted and "KafkaResponseError
RdKafkaRespErrPartitionEof
"
can be a good indicator of that.
skipNonFatal :: Monad m => Conduit (Either KafkaError b) m (Either KafkaError b) Source #
Filters out non-fatal errors (see isFatal
) and only allows fatal errors
to be propagated downstream.
skipNonFatalExcept :: Monad m => [KafkaError -> Bool] -> Conduit (Either KafkaError b) m (Either KafkaError b) Source #
Filters out non-fatal errors and provides the ability to control which non-fatal errors will still be propagated downstream.
Example:
> skipNonFatalExcept [isPollTimeout, isPartitionEOF]
The instruction above skips all the non-fatal errors except for
"KafkaResponseError
RdKafkaRespErrTimedOut
" and "KafkaResponseError
RdKafkaRespErrPartitionEof
".
This function does not allow filtering out fatal errors.
Utility combinators
mapFirst :: (Bifunctor t, Monad m) => (k -> k') -> Conduit (t k v) m (t k' v) Source #
Maps over the first element of a value
mapFirst f = L.map (first f)
mapValue :: (Functor t, Monad m) => (v -> v') -> Conduit (t v) m (t v') Source #
Maps over a value
mapValue f = L.map (fmap f)
bimapValue :: (Bifunctor t, Monad m) => (k -> k') -> (v -> v') -> Conduit (t k v) m (t k' v') Source #
Bimaps (maps over both the first and the second element) over a value
bimapValue f g = L.map (bimap f g)
sequenceValueFirst :: (Bitraversable t, Applicative f, Monad m) => Conduit (t (f k) v) m (f (t k v)) Source #
Sequences the first element of a value
sequenceValueFirst = L.map sequenceFirst
sequenceValue :: (Traversable t, Applicative f, Monad m) => Conduit (t (f v)) m (f (t v)) Source #
Sequences the value
sequenceValue = L.map sequenceA
bisequenceValue :: (Bitraversable t, Applicative f, Monad m) => Conduit (t (f k) (f v)) m (f (t k v)) Source #
Sequences both the first and the second element of a value (bisequences the value)
bisequenceValue = L.map bisequenceA
traverseValueFirst :: (Bitraversable t, Applicative f, Monad m) => (k -> f k') -> Conduit (t k v) m (f (t k' v)) Source #
Traverses over the first element of a value
traverseValueFirst f = L.map (traverseFirst f)
traverseValue :: (Traversable t, Applicative f, Monad m) => (v -> f v') -> Conduit (t v) m (f (t v')) Source #
Traverses over the value
L.map (traverse f)
bitraverseValue :: (Bitraversable t, Applicative f, Monad m) => (k -> f k') -> (v -> f v') -> Conduit (t k v) m (f (t k' v')) Source #
Traverses over both the first and the second elements of a value (bitraverses over a value)
bitraverseValue f g = L.map (bitraverse f g)
traverseValueFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> Conduit (t k v) m (f (t k' v)) Source #
Monadically traverses over the first element of a value
traverseValueFirstM f = L.mapM (traverseFirstM f)
traverseValueM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> Conduit (t v) m (f (t v')) Source #
Monadically traverses over a value
traverseValueM f = L.mapM (traverseM f)
bitraverseValueM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> Conduit (t k v) m (f (t k' v')) Source #
Monadically traverses over both the first and the second elements of a value (monadically bitraverses over a value)
bitraverseValueM f g = L.mapM (bitraverseM f g)