| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Network.Kafka
Contents
- type KafkaAddress = (Host, Port)
- data KafkaState = KafkaState {
- _stateName :: KafkaString
- _stateRequiredAcks :: RequiredAcks
- _stateRequestTimeout :: Timeout
- _stateWaitSize :: MinBytes
- _stateBufferSize :: MaxBytes
- _stateWaitTime :: MaxWaitTime
- _stateCorrelationId :: CorrelationId
- _stateBrokers :: Map Leader Broker
- _stateConnections :: Map KafkaAddress (Pool Handle)
- _stateTopicMetadata :: Map TopicName TopicMetadata
- _stateAddresses :: NonEmpty KafkaAddress
- stateWaitTime :: Lens' KafkaState MaxWaitTime
- stateWaitSize :: Lens' KafkaState MinBytes
- stateTopicMetadata :: Lens' KafkaState (Map TopicName TopicMetadata)
- stateRequiredAcks :: Lens' KafkaState RequiredAcks
- stateRequestTimeout :: Lens' KafkaState Timeout
- stateName :: Lens' KafkaState KafkaString
- stateCorrelationId :: Lens' KafkaState CorrelationId
- stateConnections :: Lens' KafkaState (Map KafkaAddress (Pool Handle))
- stateBufferSize :: Lens' KafkaState MaxBytes
- stateBrokers :: Lens' KafkaState (Map Leader Broker)
- stateAddresses :: Lens' KafkaState (NonEmpty KafkaAddress)
- type Kafka = StateT KafkaState (ExceptT KafkaClientError IO)
- type KafkaClientId = KafkaString
- data KafkaClientError
- data KafkaExpectedResponse
- data KafkaTime
- data PartitionAndLeader = PartitionAndLeader {}
- palTopic :: Lens' PartitionAndLeader TopicName
- palPartition :: Lens' PartitionAndLeader Partition
- palLeader :: Lens' PartitionAndLeader Leader
- data TopicAndPartition = TopicAndPartition {}
- data TopicAndMessage = TopicAndMessage {}
- tamTopic :: Lens' TopicAndMessage TopicName
- tamMessage :: Lens' TopicAndMessage Message
- tamPayload :: TopicAndMessage -> ByteString
- defaultCorrelationId :: CorrelationId
- defaultRequiredAcks :: RequiredAcks
- defaultRequestTimeout :: Timeout
- defaultMinBytes :: MinBytes
- defaultMaxBytes :: MaxBytes
- defaultMaxWaitTime :: MaxWaitTime
- mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState
- addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState
- runKafka :: KafkaState -> Kafka a -> IO (Either KafkaClientError a)
- makeRequest :: RequestMessage -> Kafka Request
- tryKafkaIO :: IO a -> Kafka a
- tryKafka :: Kafka a -> Kafka a
- doRequest :: Handle -> Request -> Kafka Response
- runGetKafka :: Get a -> ByteString -> Kafka a
- metadata :: MetadataRequest -> Kafka MetadataResponse
- metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse
- getTopicPartitionLeader :: TopicName -> Partition -> Kafka Broker
- expect :: KafkaClientError -> (a -> Maybe b) -> a -> Kafka b
- brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader]
- findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> Kafka a
- expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b
- protocolTime :: KafkaTime -> Time
- ordinaryConsumerId :: ReplicaId
- fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest
- fetch' :: Handle -> FetchRequest -> Kafka FetchResponse
- fetch :: FetchRequest -> Kafka FetchResponse
- fetchMessages :: FetchResponse -> [TopicAndMessage]
- updateMetadatas :: [TopicName] -> Kafka ()
- updateMetadata :: TopicName -> Kafka ()
- updateAllMetadata :: Kafka ()
- withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a
- withAddressHandle :: KafkaAddress -> (Handle -> Kafka a) -> Kafka a
- broker2address :: Broker -> KafkaAddress
- withAnyHandle :: (Handle -> Kafka a) -> Kafka a
- data PartitionOffsetRequestInfo = PartitionOffsetRequestInfo {}
- getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset
- getLastOffset' :: Handle -> KafkaTime -> Partition -> TopicName -> Kafka Offset
- offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage
Documentation
type KafkaAddress = (Host, Port) Source
data KafkaState Source
Constructors
| KafkaState | |
Fields
| |
Instances
type Kafka = StateT KafkaState (ExceptT KafkaClientError IO) Source
The core Kafka monad.
type KafkaClientId = KafkaString Source
data KafkaClientError Source
Errors given from the Kafka monad.
Constructors
| KafkaNoOffset | A response did not contain an offset. |
| KafkaExpected KafkaExpectedResponse | Got a different form of a response than was requested. |
| KafkaDeserializationError String | A value could not be deserialized correctly. |
| KafkaInvalidBroker Leader | Could not find a cached broker for the found leader. |
| KafkaFailedToFetchMetadata | |
| KafkaIOException IOException |
Instances
data KafkaExpectedResponse Source
Type of response to expect, used for KafkaExpected error.
Constructors
| ExpectedMetadata | |
| ExpectedFetch | |
| ExpectedProduce |
An abstract form of Kafka's time. Used for querying offsets.
Constructors
| LatestTime | The latest time on the broker. |
| EarliestTime | The earliest time on the broker. |
| OtherTime Time | A specific time. |
data PartitionAndLeader Source
Constructors
| PartitionAndLeader | |
Fields
| |
Instances
data TopicAndPartition Source
Constructors
| TopicAndPartition | |
Fields | |
data TopicAndMessage Source
A topic with a serializable message.
Constructors
| TopicAndMessage | |
Fields
| |
Instances
tamPayload :: TopicAndMessage -> ByteString Source
Get the bytes from the Kafka message, ignoring the topic.
Configuration
defaultCorrelationId :: CorrelationId Source
Default: 0
defaultRequiredAcks :: RequiredAcks Source
Default: 1
defaultRequestTimeout :: Timeout Source
Default: 10000
defaultMinBytes :: MinBytes Source
Default: 0
defaultMaxBytes :: MaxBytes Source
Default: 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime Source
Default: 0
mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState Source
Create a consumer using default values.
runKafka :: KafkaState -> Kafka a -> IO (Either KafkaClientError a) Source
Run the underlying Kafka monad.
makeRequest :: RequestMessage -> Kafka Request Source
Make a request, incrementing the _stateCorrelationId.
tryKafkaIO :: IO a -> Kafka a Source
Catch IOExceptions and wrap them in KafkaIOExceptions.
tryKafka :: Kafka a -> Kafka a Source
Catch IOExceptions and wrap them in KafkaIOExceptions.
runGetKafka :: Get a -> ByteString -> Kafka a Source
metadata :: MetadataRequest -> Kafka MetadataResponse Source
Send a metadata request
metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse Source
Send a metadata request
expect :: KafkaClientError -> (a -> Maybe b) -> a -> Kafka b Source
brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader] Source
Find a leader and partition for the topic.
findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> Kafka a Source
expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b Source
Function to give an error when the response seems wrong.
protocolTime :: KafkaTime -> Time Source
Convert an abstract time to a serializable protocol value.
Fetching
ordinaryConsumerId :: ReplicaId Source
Default: -1
fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest Source
Construct a fetch request from the values in the state.
fetch' :: Handle -> FetchRequest -> Kafka FetchResponse Source
Execute a fetch request and get the raw fetch response.
fetch :: FetchRequest -> Kafka FetchResponse Source
Execute a fetch request and get the raw fetch response. Round-robins the
requests to addresses in the KafkaState.
fetchMessages :: FetchResponse -> [TopicAndMessage] Source
Extract out messages with their topics from a fetch response.
updateMetadatas :: [TopicName] -> Kafka () Source
updateMetadata :: TopicName -> Kafka () Source
updateAllMetadata :: Kafka () Source
withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a Source
Execute a Kafka action with a Handle for the given Broker, updating
the connections cache if needed.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
withAddressHandle :: KafkaAddress -> (Handle -> Kafka a) -> Kafka a Source
Execute a Kafka action with a Handle for the given KafkaAddress,
updating the connections cache if needed.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
withAnyHandle :: (Handle -> Kafka a) -> Kafka a Source
Like withAddressHandle, but round-robins the addresses in the KafkaState.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
Offsets
data PartitionOffsetRequestInfo Source
Fields to construct an offset request, per topic and partition.
Constructors
| PartitionOffsetRequestInfo | |
Fields
| |
getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset Source
Get the first found offset.
getLastOffset' :: Handle -> KafkaTime -> Partition -> TopicName -> Kafka Offset Source
Get the first found offset.
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage Source
Create an offset request.