Safe Haskell | None |
---|---|
Language | Haskell2010 |
- type KafkaAddress = (Host, Port)
- data KafkaState = KafkaState {
- _stateName :: KafkaString
- _stateRequiredAcks :: RequiredAcks
- _stateRequestTimeout :: Timeout
- _stateWaitSize :: MinBytes
- _stateBufferSize :: MaxBytes
- _stateWaitTime :: MaxWaitTime
- _stateCorrelationId :: CorrelationId
- _stateBrokers :: Map Leader Broker
- _stateConnections :: Map KafkaAddress (Pool Handle)
- _stateTopicMetadata :: Map TopicName TopicMetadata
- _stateAddresses :: NonEmpty KafkaAddress
- stateWaitTime :: Lens' KafkaState MaxWaitTime
- stateWaitSize :: Lens' KafkaState MinBytes
- stateTopicMetadata :: Lens' KafkaState (Map TopicName TopicMetadata)
- stateRequiredAcks :: Lens' KafkaState RequiredAcks
- stateRequestTimeout :: Lens' KafkaState Timeout
- stateName :: Lens' KafkaState KafkaString
- stateCorrelationId :: Lens' KafkaState CorrelationId
- stateConnections :: Lens' KafkaState (Map KafkaAddress (Pool Handle))
- stateBufferSize :: Lens' KafkaState MaxBytes
- stateBrokers :: Lens' KafkaState (Map Leader Broker)
- stateAddresses :: Lens' KafkaState (NonEmpty KafkaAddress)
- type Kafka = StateT KafkaState (ExceptT KafkaClientError IO)
- type KafkaClientId = KafkaString
- data KafkaClientError
- data KafkaTime
- data PartitionAndLeader = PartitionAndLeader {}
- palTopic :: Lens' PartitionAndLeader TopicName
- palPartition :: Lens' PartitionAndLeader Partition
- palLeader :: Lens' PartitionAndLeader Leader
- data TopicAndPartition = TopicAndPartition {}
- data TopicAndMessage = TopicAndMessage {}
- tamTopic :: Lens' TopicAndMessage TopicName
- tamMessage :: Lens' TopicAndMessage Message
- tamPayload :: TopicAndMessage -> ByteString
- defaultCorrelationId :: CorrelationId
- defaultRequiredAcks :: RequiredAcks
- defaultRequestTimeout :: Timeout
- defaultMinBytes :: MinBytes
- defaultMaxBytes :: MaxBytes
- defaultMaxWaitTime :: MaxWaitTime
- mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState
- addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState
- runKafka :: KafkaState -> Kafka a -> IO (Either KafkaClientError a)
- tryKafka :: Kafka a -> Kafka a
- makeRequest :: Handle -> ReqResp (Kafka a) -> Kafka a
- metadata :: MetadataRequest -> Kafka MetadataResponse
- metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse
- getTopicPartitionLeader :: TopicName -> Partition -> Kafka Broker
- expect :: KafkaClientError -> (a -> Maybe b) -> a -> Kafka b
- brokerPartitionInfo :: TopicName -> Kafka (Set PartitionAndLeader)
- findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> Kafka a
- protocolTime :: KafkaTime -> Time
- updateMetadatas :: [TopicName] -> Kafka ()
- updateMetadata :: TopicName -> Kafka ()
- updateAllMetadata :: Kafka ()
- withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a
- withAddressHandle :: KafkaAddress -> (Handle -> Kafka a) -> Kafka a
- broker2address :: Broker -> KafkaAddress
- withAnyHandle :: (Handle -> Kafka a) -> Kafka a
- data PartitionOffsetRequestInfo = PartitionOffsetRequestInfo {}
- getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset
- getLastOffset' :: Handle -> KafkaTime -> Partition -> TopicName -> Kafka Offset
- offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest
Documentation
type KafkaAddress = (Host, Port) Source #
data KafkaState Source #
KafkaState | |
|
type Kafka = StateT KafkaState (ExceptT KafkaClientError IO) Source #
The core Kafka monad.
type KafkaClientId = KafkaString Source #
data KafkaClientError Source #
Errors given from the Kafka monad.
KafkaNoOffset | A response did not contain an offset. |
KafkaDeserializationError String | A value could not be deserialized correctly. |
KafkaInvalidBroker Leader | Could not find a cached broker for the found leader. |
KafkaFailedToFetchMetadata | |
KafkaIOException IOException |
An abstract form of Kafka's time. Used for querying offsets.
LatestTime | The latest time on the broker. |
EarliestTime | The earliest time on the broker. |
OtherTime Time | A specific time. |
data PartitionAndLeader Source #
data TopicAndPartition Source #
data TopicAndMessage Source #
A topic with a serializable message.
tamPayload :: TopicAndMessage -> ByteString Source #
Get the bytes from the Kafka message, ignoring the topic.
Configuration
defaultCorrelationId :: CorrelationId Source #
Default: 0
defaultRequiredAcks :: RequiredAcks Source #
Default: 1
defaultRequestTimeout :: Timeout Source #
Default: 10000
defaultMinBytes :: MinBytes Source #
Default: 0
defaultMaxBytes :: MaxBytes Source #
Default: 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime Source #
Default: 0
mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState Source #
Create a consumer using default values.
addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState Source #
runKafka :: KafkaState -> Kafka a -> IO (Either KafkaClientError a) Source #
Run the underlying Kafka monad.
tryKafka :: Kafka a -> Kafka a Source #
Catch IOException
s and wrap them in KafkaIOException
s.
makeRequest :: Handle -> ReqResp (Kafka a) -> Kafka a Source #
Make a request, incrementing the _stateCorrelationId
.
metadata :: MetadataRequest -> Kafka MetadataResponse Source #
Send a metadata request to any broker.
metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse Source #
Send a metadata request.
brokerPartitionInfo :: TopicName -> Kafka (Set PartitionAndLeader) Source #
Find a leader and partition for the topic.
findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> Kafka a Source #
protocolTime :: KafkaTime -> Time Source #
Convert an abstract time to a serializable protocol value.
updateMetadatas :: [TopicName] -> Kafka () Source #
updateMetadata :: TopicName -> Kafka () Source #
updateAllMetadata :: Kafka () Source #
withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a Source #
Execute a Kafka action with a Handle
for the given Broker
, updating
the connections cache if needed.
When the action throws an IOException
, it is caught and returned as a
KafkaIOException
in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOException
s and exceptions thrown by
throwError
from Except
.
withAddressHandle :: KafkaAddress -> (Handle -> Kafka a) -> Kafka a Source #
Execute a Kafka action with a Handle
for the given KafkaAddress
,
updating the connections cache if needed.
When the action throws an IOException
, it is caught and returned as a
KafkaIOException
in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOException
s and exceptions thrown by
throwError
from Except
.
broker2address :: Broker -> KafkaAddress Source #
withAnyHandle :: (Handle -> Kafka a) -> Kafka a Source #
Like withAddressHandle
, but round-robins the addresses in the KafkaState
.
When the action throws an IOException
, it is caught and returned as a
KafkaIOException
in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOException
s and exceptions thrown by
throwError
from Except
.
Offsets
data PartitionOffsetRequestInfo Source #
Fields to construct an offset request, per topic and partition.
PartitionOffsetRequestInfo | |
|
getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset Source #
Get the first found offset.
getLastOffset' :: Handle -> KafkaTime -> Partition -> TopicName -> Kafka Offset Source #
Get the first found offset.
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest Source #
Create an offset request.