Safe Haskell | None |
---|---|
Language | Haskell2010 |
- data KafkaState = KafkaState {
- _stateName :: KafkaString
- _stateRequiredAcks :: RequiredAcks
- _stateRequestTimeout :: Timeout
- _stateWaitSize :: MinBytes
- _stateBufferSize :: MaxBytes
- _stateWaitTime :: MaxWaitTime
- _stateCorrelationId :: CorrelationId
- _stateBrokers :: Map Leader Broker
- _stateConnections :: Map Broker (Pool Handle)
- _stateTopicMetadata :: Map TopicName TopicMetadata
- stateWaitTime :: Lens' KafkaState MaxWaitTime
- stateWaitSize :: Lens' KafkaState MinBytes
- stateTopicMetadata :: Lens' KafkaState (Map TopicName TopicMetadata)
- stateRequiredAcks :: Lens' KafkaState RequiredAcks
- stateRequestTimeout :: Lens' KafkaState Timeout
- stateName :: Lens' KafkaState KafkaString
- stateCorrelationId :: Lens' KafkaState CorrelationId
- stateConnections :: Lens' KafkaState (Map Broker (Pool Handle))
- stateBufferSize :: Lens' KafkaState MaxBytes
- stateBrokers :: Lens' KafkaState (Map Leader Broker)
- data KafkaClient = KafkaClient {}
- kafkaClientState :: Lens' KafkaClient KafkaState
- kafkaClientHandle :: Lens' KafkaClient Handle
- type Kafka = StateT KafkaClient (EitherT KafkaClientError IO)
- type KafkaAddress = (Host, Port)
- type KafkaClientId = KafkaString
- data KafkaClientError
- data KafkaExpectedResponse
- data KafkaTime
- data PartitionAndLeader = PartitionAndLeader {}
- palTopic :: Lens' PartitionAndLeader TopicName
- palPartition :: Lens' PartitionAndLeader Partition
- palLeader :: Lens' PartitionAndLeader Leader
- data TopicAndPartition = TopicAndPartition {}
- data TopicAndMessage = TopicAndMessage {}
- tamTopic :: Lens' TopicAndMessage TopicName
- tamMessage :: Lens' TopicAndMessage Message
- tamPayload :: TopicAndMessage -> ByteString
- defaultCorrelationId :: CorrelationId
- defaultRequiredAcks :: RequiredAcks
- defaultRequestTimeout :: Timeout
- defaultMinBytes :: MinBytes
- defaultMaxBytes :: MaxBytes
- defaultMaxWaitTime :: MaxWaitTime
- defaultState :: KafkaClientId -> KafkaState
- runKafka :: KafkaAddress -> KafkaState -> Kafka a -> IO (Either KafkaClientError a)
- makeRequest :: RequestMessage -> Kafka Request
- doRequest :: Request -> Kafka Response
- doRequest' :: Handle -> Request -> Kafka Response
- metadata :: MetadataRequest -> Kafka MetadataResponse
- metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse
- expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b
- protocolTime :: KafkaTime -> Time
- ordinaryConsumerId :: ReplicaId
- fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest
- fetch :: FetchRequest -> Kafka FetchResponse
- fetchMessages :: FetchResponse -> [TopicAndMessage]
- updateMetadatas :: [TopicName] -> Kafka ()
- updateMetadata :: TopicName -> Kafka ()
- updateAllMetadata :: Kafka ()
- withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a
- data PartitionOffsetRequestInfo = PartitionOffsetRequestInfo {}
- getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset
- offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage
Documentation
data KafkaState Source
KafkaState | |
|
stateConnections :: Lens' KafkaState (Map Broker (Pool Handle)) Source
type Kafka = StateT KafkaClient (EitherT KafkaClientError IO) Source
The core Kafka monad.
type KafkaAddress = (Host, Port) Source
type KafkaClientId = KafkaString Source
data KafkaClientError Source
Errors given from the Kafka monad.
KafkaNoOffset | A response did not contain an offset. |
KafkaExpected KafkaExpectedResponse | Got a different form of a response than was requested. |
KafkaDeserializationError String | A value could not be deserialized correctly. |
KafkaInvalidBroker Leader | Could not find a cached broker for the found leader. |
KafkaFailedToFetchMetadata |
data KafkaExpectedResponse Source
Type of response to expect, used for KafkaExpected
error.
An abstract form of Kafka's time. Used for querying offsets.
LatestTime | The latest time on the broker. |
EarliestTime | The earliest time on the broker. |
OtherTime Time | A specific time. |
data PartitionAndLeader Source
data TopicAndPartition Source
data TopicAndMessage Source
A topic with a serializable message.
tamPayload :: TopicAndMessage -> ByteString Source
Get the bytes from the Kafka message, ignoring the topic.
Configuration
defaultCorrelationId :: CorrelationId Source
Default: 0
defaultRequiredAcks :: RequiredAcks Source
Default: 1
defaultRequestTimeout :: Timeout Source
Default: 10000
defaultMinBytes :: MinBytes Source
Default: 0
defaultMaxBytes :: MaxBytes Source
Default: 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime Source
Default: 0
defaultState :: KafkaClientId -> KafkaState Source
Create a consumer using default values.
runKafka :: KafkaAddress -> KafkaState -> Kafka a -> IO (Either KafkaClientError a) Source
Run the underlying Kafka monad at the given leader address and initial state.
makeRequest :: RequestMessage -> Kafka Request Source
Make a request, incrementing the _stateCorrelationId
.
metadata :: MetadataRequest -> Kafka MetadataResponse Source
Send a metadata request
metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse Source
Send a metadata request
expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b Source
Function to give an error when the response seems wrong.
protocolTime :: KafkaTime -> Time Source
Convert an abstract time to a serializable protocol value.
Fetching
ordinaryConsumerId :: ReplicaId Source
Default: -1
fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest Source
Construct a fetch request from the values in the state.
fetch :: FetchRequest -> Kafka FetchResponse Source
Execute a fetch request and get the raw fetch response.
fetchMessages :: FetchResponse -> [TopicAndMessage] Source
Extract out messages with their topics from a fetch response.
updateMetadatas :: [TopicName] -> Kafka () Source
updateMetadata :: TopicName -> Kafka () Source
updateAllMetadata :: Kafka () Source
withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a Source
Execute a handler action, creating a new Pool and updating the connections Map if needed.
Offsets
data PartitionOffsetRequestInfo Source
Fields to construct an offset request, per topic and partition.
PartitionOffsetRequestInfo | |
|
getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset Source
Get the first found offset.
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage Source
Create an offset request.