| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Network.Kafka
Contents
- type KafkaAddress = (Host, Port)
- data KafkaState = KafkaState {
- _stateName :: KafkaString
- _stateRequiredAcks :: RequiredAcks
- _stateRequestTimeout :: Timeout
- _stateWaitSize :: MinBytes
- _stateBufferSize :: MaxBytes
- _stateWaitTime :: MaxWaitTime
- _stateCorrelationId :: CorrelationId
- _stateBrokers :: Map Leader Broker
- _stateConnections :: Map KafkaAddress (Pool Handle)
- _stateTopicMetadata :: Map TopicName TopicMetadata
- _stateAddresses :: NonEmpty KafkaAddress
- stateWaitTime :: Lens' KafkaState MaxWaitTime
- stateWaitSize :: Lens' KafkaState MinBytes
- stateTopicMetadata :: Lens' KafkaState (Map TopicName TopicMetadata)
- stateRequiredAcks :: Lens' KafkaState RequiredAcks
- stateRequestTimeout :: Lens' KafkaState Timeout
- stateName :: Lens' KafkaState KafkaString
- stateCorrelationId :: Lens' KafkaState CorrelationId
- stateConnections :: Lens' KafkaState (Map KafkaAddress (Pool Handle))
- stateBufferSize :: Lens' KafkaState MaxBytes
- stateBrokers :: Lens' KafkaState (Map Leader Broker)
- stateAddresses :: Lens' KafkaState (NonEmpty KafkaAddress)
- type Kafka = StateT KafkaState (ExceptT KafkaClientError IO)
- type KafkaClientId = KafkaString
- data KafkaClientError
- data KafkaTime
- data PartitionAndLeader = PartitionAndLeader {}
- palTopic :: Lens' PartitionAndLeader TopicName
- palPartition :: Lens' PartitionAndLeader Partition
- palLeader :: Lens' PartitionAndLeader Leader
- data TopicAndPartition = TopicAndPartition {}
- data TopicAndMessage = TopicAndMessage {}
- tamTopic :: Lens' TopicAndMessage TopicName
- tamMessage :: Lens' TopicAndMessage Message
- tamPayload :: TopicAndMessage -> ByteString
- defaultCorrelationId :: CorrelationId
- defaultRequiredAcks :: RequiredAcks
- defaultRequestTimeout :: Timeout
- defaultMinBytes :: MinBytes
- defaultMaxBytes :: MaxBytes
- defaultMaxWaitTime :: MaxWaitTime
- mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState
- addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState
- runKafka :: KafkaState -> Kafka a -> IO (Either KafkaClientError a)
- tryKafka :: Kafka a -> Kafka a
- makeRequest :: Handle -> ReqResp (Kafka a) -> Kafka a
- metadata :: MetadataRequest -> Kafka MetadataResponse
- metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse
- getTopicPartitionLeader :: TopicName -> Partition -> Kafka Broker
- expect :: KafkaClientError -> (a -> Maybe b) -> a -> Kafka b
- brokerPartitionInfo :: TopicName -> Kafka (Set PartitionAndLeader)
- findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> Kafka a
- protocolTime :: KafkaTime -> Time
- updateMetadatas :: [TopicName] -> Kafka ()
- updateMetadata :: TopicName -> Kafka ()
- updateAllMetadata :: Kafka ()
- withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a
- withAddressHandle :: KafkaAddress -> (Handle -> Kafka a) -> Kafka a
- broker2address :: Broker -> KafkaAddress
- withAnyHandle :: (Handle -> Kafka a) -> Kafka a
- data PartitionOffsetRequestInfo = PartitionOffsetRequestInfo {}
- getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset
- getLastOffset' :: Handle -> KafkaTime -> Partition -> TopicName -> Kafka Offset
- offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest
Documentation
type KafkaAddress = (Host, Port) Source
data KafkaState Source
Constructors
| KafkaState | |
Fields
| |
Instances
type Kafka = StateT KafkaState (ExceptT KafkaClientError IO) Source
The core Kafka monad.
type KafkaClientId = KafkaString Source
data KafkaClientError Source
Errors given from the Kafka monad.
Constructors
| KafkaNoOffset | A response did not contain an offset. |
| KafkaDeserializationError String | A value could not be deserialized correctly. |
| KafkaInvalidBroker Leader | Could not find a cached broker for the found leader. |
| KafkaFailedToFetchMetadata | |
| KafkaIOException IOException |
Instances
An abstract form of Kafka's time. Used for querying offsets.
Constructors
| LatestTime | The latest time on the broker. |
| EarliestTime | The earliest time on the broker. |
| OtherTime Time | A specific time. |
data PartitionAndLeader Source
Constructors
| PartitionAndLeader | |
Fields
| |
data TopicAndPartition Source
Constructors
| TopicAndPartition | |
Fields | |
data TopicAndMessage Source
A topic with a serializable message.
Constructors
| TopicAndMessage | |
Fields
| |
Instances
tamPayload :: TopicAndMessage -> ByteString Source
Get the bytes from the Kafka message, ignoring the topic.
Configuration
defaultCorrelationId :: CorrelationId Source
Default: 0
defaultRequiredAcks :: RequiredAcks Source
Default: 1
defaultRequestTimeout :: Timeout Source
Default: 10000
defaultMinBytes :: MinBytes Source
Default: 0
defaultMaxBytes :: MaxBytes Source
Default: 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime Source
Default: 0
mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState Source
Create a consumer using default values.
runKafka :: KafkaState -> Kafka a -> IO (Either KafkaClientError a) Source
Run the underlying Kafka monad.
tryKafka :: Kafka a -> Kafka a Source
Catch IOExceptions and wrap them in KafkaIOExceptions.
makeRequest :: Handle -> ReqResp (Kafka a) -> Kafka a Source
Make a request, incrementing the _stateCorrelationId.
metadata :: MetadataRequest -> Kafka MetadataResponse Source
Send a metadata request to any broker.
metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse Source
Send a metadata request.
expect :: KafkaClientError -> (a -> Maybe b) -> a -> Kafka b Source
brokerPartitionInfo :: TopicName -> Kafka (Set PartitionAndLeader) Source
Find a leader and partition for the topic.
findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> Kafka a Source
protocolTime :: KafkaTime -> Time Source
Convert an abstract time to a serializable protocol value.
updateMetadatas :: [TopicName] -> Kafka () Source
updateMetadata :: TopicName -> Kafka () Source
updateAllMetadata :: Kafka () Source
withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a Source
Execute a Kafka action with a Handle for the given Broker, updating
the connections cache if needed.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
withAddressHandle :: KafkaAddress -> (Handle -> Kafka a) -> Kafka a Source
Execute a Kafka action with a Handle for the given KafkaAddress,
updating the connections cache if needed.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
withAnyHandle :: (Handle -> Kafka a) -> Kafka a Source
Like withAddressHandle, but round-robins the addresses in the KafkaState.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
Offsets
data PartitionOffsetRequestInfo Source
Fields to construct an offset request, per topic and partition.
Constructors
| PartitionOffsetRequestInfo | |
Fields
| |
getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset Source
Get the first found offset.
getLastOffset' :: Handle -> KafkaTime -> Partition -> TopicName -> Kafka Offset Source
Get the first found offset.
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest Source
Create an offset request.