Safe Haskell | None |
---|---|
Language | Haskell2010 |
- data KafkaState = KafkaState {}
- stateWaitTime :: Lens' KafkaState MaxWaitTime
- stateWaitSize :: Lens' KafkaState MinBytes
- stateRequiredAcks :: Lens' KafkaState RequiredAcks
- stateRequestTimeout :: Lens' KafkaState Timeout
- stateName :: Lens' KafkaState KafkaString
- stateCorrelationId :: Lens' KafkaState CorrelationId
- stateBufferSize :: Lens' KafkaState MaxBytes
- stateBrokers :: Lens' KafkaState (Map Leader Broker)
- data KafkaConsumer = KafkaConsumer {}
- consumerState :: Lens' KafkaConsumer KafkaState
- consumerHandle :: Lens' KafkaConsumer Handle
- type Kafka = StateT KafkaConsumer (EitherT KafkaClientError IO)
- type KafkaAddress = (Host, Port)
- type KafkaClientId = KafkaString
- data KafkaClientError
- data KafkaExpectedResponse
- data KafkaTime
- data PartitionAndLeader = PartitionAndLeader {}
- palTopic :: Lens' PartitionAndLeader TopicName
- palPartition :: Lens' PartitionAndLeader Partition
- palLeader :: Lens' PartitionAndLeader Leader
- data TopicAndPartition = TopicAndPartition {}
- data TopicAndMessage = TopicAndMessage {}
- tamTopic :: Lens' TopicAndMessage TopicName
- tamMessage :: Lens' TopicAndMessage Message
- tamPayload :: TopicAndMessage -> ByteString
- defaultCorrelationId :: CorrelationId
- defaultRequiredAcks :: RequiredAcks
- defaultRequestTimeout :: Timeout
- defaultMinBytes :: MinBytes
- defaultMaxBytes :: MaxBytes
- defaultMaxWaitTime :: MaxWaitTime
- defaultState :: KafkaClientId -> KafkaState
- runKafka :: KafkaAddress -> KafkaState -> Kafka a -> IO (Either KafkaClientError a)
- makeRequest :: RequestMessage -> Kafka Request
- doRequest :: Request -> Kafka Response
- metadata :: MetadataRequest -> Kafka MetadataResponse
- expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b
- protocolTime :: KafkaTime -> Time
- partitionAndCollate :: [TopicAndMessage] -> Kafka (Map Leader (Map TopicAndPartition [TopicAndMessage]))
- getPartition :: [PartitionAndLeader] -> Kafka (Maybe PartitionAndLeader)
- groupMessagesToSet :: [TopicAndMessage] -> MessageSet
- brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader]
- defaultMessageCrc :: Crc
- defaultMessageMagicByte :: MagicByte
- defaultMessageKey :: Key
- defaultMessageAttributes :: Attributes
- makeMessage :: ByteString -> Message
- ordinaryConsumerId :: ReplicaId
- fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest
- fetch :: FetchRequest -> Kafka FetchResponse
- fetchMessages :: FetchResponse -> [TopicAndMessage]
- produce :: ProduceRequest -> Kafka ProduceResponse
- produceRequest :: RequiredAcks -> Timeout -> [(TopicAndPartition, MessageSet)] -> ProduceRequest
- produceMessages :: [TopicAndMessage] -> Kafka [ProduceResponse]
- send :: Leader -> [(TopicAndPartition, MessageSet)] -> Kafka ProduceResponse
- data PartitionOffsetRequestInfo = PartitionOffsetRequestInfo {}
- getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset
- offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage
Documentation
data KafkaState Source
KafkaState | |
|
type Kafka = StateT KafkaConsumer (EitherT KafkaClientError IO) Source
The core Kafka monad.
type KafkaAddress = (Host, Port) Source
type KafkaClientId = KafkaString Source
data KafkaClientError Source
Errors given from the Kafka monad.
KafkaNoOffset | A response did not contain an offset. |
KafkaExpected KafkaExpectedResponse | Got a different form of a response than was requested. |
KafkaDeserializationError String | A value could not be deserialized correctly. |
KafkaInvalidBroker Leader | Could not find a cached broker for the found leader. |
data KafkaExpectedResponse Source
Type of response to expect, used for KafkaExpected
error.
An abstract form of Kafka's time. Used for querying offsets.
LatestTime | The latest time on the broker. |
EarliestTime | The earliest time on the broker. |
OtherTime Time | A specific time. |
data PartitionAndLeader Source
data TopicAndPartition Source
data TopicAndMessage Source
A topic with a serializable message.
tamPayload :: TopicAndMessage -> ByteString Source
Get the bytes from the Kafka message, ignoring the topic.
Configuration
defaultCorrelationId :: CorrelationId Source
Default: 0
defaultRequiredAcks :: RequiredAcks Source
Default: 1
defaultRequestTimeout :: Timeout Source
Default: 10000
defaultMinBytes :: MinBytes Source
Default: 0
defaultMaxBytes :: MaxBytes Source
Default: 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime Source
Default: 0
defaultState :: KafkaClientId -> KafkaState Source
Create a consumer using default values.
runKafka :: KafkaAddress -> KafkaState -> Kafka a -> IO (Either KafkaClientError a) Source
Run the underlying Kafka monad at the given leader address and initial state.
makeRequest :: RequestMessage -> Kafka Request Source
Make a request, incrementing the _stateCorrelationId
.
metadata :: MetadataRequest -> Kafka MetadataResponse Source
Send a metadata request
expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b Source
Function to give an error when the response seems wrong.
protocolTime :: KafkaTime -> Time Source
Convert an abstract time to a serializable protocol value.
Messages
partitionAndCollate :: [TopicAndMessage] -> Kafka (Map Leader (Map TopicAndPartition [TopicAndMessage])) Source
Group messages together with the leader they should be sent to.
groupMessagesToSet :: [TopicAndMessage] -> MessageSet Source
Create a protocol message set from a list of messages.
brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader] Source
Find a leader and partition for the topic.
defaultMessageCrc :: Crc Source
Default: 1
defaultMessageMagicByte :: MagicByte Source
Default: 0
defaultMessageKey :: Key Source
Default: Nothing
defaultMessageAttributes :: Attributes Source
Default: 0
makeMessage :: ByteString -> Message Source
Construct a message from a string of bytes using default attributes.
Fetching
ordinaryConsumerId :: ReplicaId Source
Default: -1
fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest Source
Construct a fetch request from the values in the state.
fetch :: FetchRequest -> Kafka FetchResponse Source
Execute a fetch request and get the raw fetch response.
fetchMessages :: FetchResponse -> [TopicAndMessage] Source
Extract out messages with their topics from a fetch response.
Producing
produce :: ProduceRequest -> Kafka ProduceResponse Source
Execute a produce request and get the raw preduce response.
produceRequest :: RequiredAcks -> Timeout -> [(TopicAndPartition, MessageSet)] -> ProduceRequest Source
Construct a produce request with explicit arguments.
produceMessages :: [TopicAndMessage] -> Kafka [ProduceResponse] Source
Send messages to partition calculated by partitionAndCollate
.
send :: Leader -> [(TopicAndPartition, MessageSet)] -> Kafka ProduceResponse Source
Execute a produce request using the values in the state.
Offsets
data PartitionOffsetRequestInfo Source
Fields to construct an offset request, per topic and partition.
PartitionOffsetRequestInfo | |
|
getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset Source
Get the first found offset.
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage Source
Create an offset request.