| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Network.Kafka
Contents
Synopsis
- type KafkaAddress = (Host, Port)
- data KafkaState = KafkaState {
- _stateName :: KafkaString
- _stateRequiredAcks :: RequiredAcks
- _stateRequestTimeout :: Timeout
- _stateWaitSize :: MinBytes
- _stateBufferSize :: MaxBytes
- _stateWaitTime :: MaxWaitTime
- _stateCorrelationId :: CorrelationId
- _stateBrokers :: Map Leader Broker
- _stateConnections :: Map KafkaAddress (Pool Handle)
- _stateTopicMetadata :: Map TopicName TopicMetadata
- _stateAddresses :: NonEmpty KafkaAddress
- stateWaitTime :: Lens' KafkaState MaxWaitTime
- stateWaitSize :: Lens' KafkaState MinBytes
- stateTopicMetadata :: Lens' KafkaState (Map TopicName TopicMetadata)
- stateRequiredAcks :: Lens' KafkaState RequiredAcks
- stateRequestTimeout :: Lens' KafkaState Timeout
- stateName :: Lens' KafkaState KafkaString
- stateCorrelationId :: Lens' KafkaState CorrelationId
- stateConnections :: Lens' KafkaState (Map KafkaAddress (Pool Handle))
- stateBufferSize :: Lens' KafkaState MaxBytes
- stateBrokers :: Lens' KafkaState (Map Leader Broker)
- stateAddresses :: Lens' KafkaState (NonEmpty KafkaAddress)
- type Kafka m = (MonadState KafkaState m, MonadError KafkaClientError m, MonadIO m, MonadBaseControl IO m)
- type KafkaClientId = KafkaString
- data KafkaClientError
- data KafkaTime
- data PartitionAndLeader = PartitionAndLeader {}
- palTopic :: Lens' PartitionAndLeader TopicName
- palPartition :: Lens' PartitionAndLeader Partition
- palLeader :: Lens' PartitionAndLeader Leader
- data TopicAndPartition = TopicAndPartition {}
- data TopicAndMessage = TopicAndMessage {}
- tamTopic :: Lens' TopicAndMessage TopicName
- tamMessage :: Lens' TopicAndMessage Message
- tamPayload :: TopicAndMessage -> ByteString
- defaultCorrelationId :: CorrelationId
- defaultRequiredAcks :: RequiredAcks
- defaultRequestTimeout :: Timeout
- defaultMinBytes :: MinBytes
- defaultMaxBytes :: MaxBytes
- defaultMaxWaitTime :: MaxWaitTime
- mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState
- addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState
- runKafka :: KafkaState -> StateT KafkaState (ExceptT KafkaClientError IO) a -> IO (Either KafkaClientError a)
- tryKafka :: Kafka m => m a -> m a
- makeRequest :: Kafka m => Handle -> ReqResp (m a) -> m a
- metadata :: Kafka m => MetadataRequest -> m MetadataResponse
- metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse
- createTopic :: Kafka m => CreateTopicsRequest -> m CreateTopicsResponse
- createTopic' :: Kafka m => Handle -> CreateTopicsRequest -> m CreateTopicsResponse
- createTopicsRequest :: TopicName -> Partition -> ReplicationFactor -> [(Partition, Replicas)] -> [(KafkaString, Metadata)] -> CreateTopicsRequest
- deleteTopic :: Kafka m => DeleteTopicsRequest -> m DeleteTopicsResponse
- deleteTopic' :: Kafka m => Handle -> DeleteTopicsRequest -> m DeleteTopicsResponse
- deleteTopicsRequest :: TopicName -> DeleteTopicsRequest
- fetchOffset :: Kafka m => OffsetFetchRequest -> m OffsetFetchResponse
- fetchOffset' :: Kafka m => Handle -> OffsetFetchRequest -> m OffsetFetchResponse
- fetchOffsetRequest :: ConsumerGroup -> TopicName -> Partition -> OffsetFetchRequest
- commitOffset :: Kafka m => OffsetCommitRequest -> m OffsetCommitResponse
- commitOffset' :: Kafka m => Handle -> OffsetCommitRequest -> m OffsetCommitResponse
- commitOffsetRequest :: ConsumerGroup -> TopicName -> Partition -> Offset -> OffsetCommitRequest
- getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker
- expect :: Kafka m => KafkaClientError -> (a -> Maybe b) -> a -> m b
- brokerPartitionInfo :: Kafka m => TopicName -> m (Set PartitionAndLeader)
- findMetadataOrElse :: Kafka m => [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> m a
- protocolTime :: KafkaTime -> Time
- updateMetadatas :: Kafka m => [TopicName] -> m ()
- updateMetadata :: Kafka m => TopicName -> m ()
- updateAllMetadata :: Kafka m => m ()
- withBrokerHandle :: Kafka m => Broker -> (Handle -> m a) -> m a
- withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a
- broker2address :: Broker -> KafkaAddress
- withAnyHandle :: Kafka m => (Handle -> m a) -> m a
- data PartitionOffsetRequestInfo = PartitionOffsetRequestInfo {}
- getLastOffset :: Kafka m => KafkaTime -> Partition -> TopicName -> m Offset
- getLastOffset' :: Kafka m => Handle -> KafkaTime -> Partition -> TopicName -> m Offset
- offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest
Documentation
type KafkaAddress = (Host, Port) Source #
data KafkaState Source #
Constructors
| KafkaState | |
Fields
| |
Instances
type Kafka m = (MonadState KafkaState m, MonadError KafkaClientError m, MonadIO m, MonadBaseControl IO m) Source #
The core Kafka monad.
type KafkaClientId = KafkaString Source #
data KafkaClientError Source #
Errors given from the Kafka monad.
Constructors
| KafkaNoOffset | A response did not contain an offset. |
| KafkaDeserializationError String | A value could not be deserialized correctly. |
| KafkaInvalidBroker Leader | Could not find a cached broker for the found leader. |
| KafkaFailedToFetchMetadata | |
| KafkaIOException IOException |
Instances
An abstract form of Kafka's time. Used for querying offsets.
Constructors
| LatestTime | The latest time on the broker. |
| EarliestTime | The earliest time on the broker. |
| OtherTime Time | A specific time. |
Instances
| Eq KafkaTime Source # | |
| Generic KafkaTime Source # | |
| type Rep KafkaTime Source # | |
Defined in Network.Kafka type Rep KafkaTime = D1 (MetaData "KafkaTime" "Network.Kafka" "milena-0.5.4.0-27nMlnYTid12v1ZBZP3eqt" False) (C1 (MetaCons "LatestTime" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "EarliestTime" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "OtherTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Time)))) | |
data PartitionAndLeader Source #
Constructors
| PartitionAndLeader | |
Fields
| |
Instances
data TopicAndPartition Source #
Constructors
| TopicAndPartition | |
Fields | |
Instances
data TopicAndMessage Source #
A topic with a serializable message.
Constructors
| TopicAndMessage | |
Fields
| |
Instances
tamPayload :: TopicAndMessage -> ByteString Source #
Get the bytes from the Kafka message, ignoring the topic.
Configuration
defaultCorrelationId :: CorrelationId Source #
Default: 0
defaultRequiredAcks :: RequiredAcks Source #
Default: 1
defaultRequestTimeout :: Timeout Source #
Default: 10000
defaultMinBytes :: MinBytes Source #
Default: 0
defaultMaxBytes :: MaxBytes Source #
Default: 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime Source #
Default: 0
mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState Source #
Create a consumer using default values.
addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState Source #
runKafka :: KafkaState -> StateT KafkaState (ExceptT KafkaClientError IO) a -> IO (Either KafkaClientError a) Source #
Run the underlying Kafka monad.
tryKafka :: Kafka m => m a -> m a Source #
Catch IOExceptions and wrap them in KafkaIOExceptions.
makeRequest :: Kafka m => Handle -> ReqResp (m a) -> m a Source #
Make a request, incrementing the _stateCorrelationId.
metadata :: Kafka m => MetadataRequest -> m MetadataResponse Source #
Send a metadata request to any broker.
metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse Source #
Send a metadata request.
createTopic :: Kafka m => CreateTopicsRequest -> m CreateTopicsResponse Source #
createTopic' :: Kafka m => Handle -> CreateTopicsRequest -> m CreateTopicsResponse Source #
createTopicsRequest :: TopicName -> Partition -> ReplicationFactor -> [(Partition, Replicas)] -> [(KafkaString, Metadata)] -> CreateTopicsRequest Source #
deleteTopic :: Kafka m => DeleteTopicsRequest -> m DeleteTopicsResponse Source #
deleteTopic' :: Kafka m => Handle -> DeleteTopicsRequest -> m DeleteTopicsResponse Source #
fetchOffset :: Kafka m => OffsetFetchRequest -> m OffsetFetchResponse Source #
fetchOffset' :: Kafka m => Handle -> OffsetFetchRequest -> m OffsetFetchResponse Source #
commitOffset :: Kafka m => OffsetCommitRequest -> m OffsetCommitResponse Source #
commitOffset' :: Kafka m => Handle -> OffsetCommitRequest -> m OffsetCommitResponse Source #
commitOffsetRequest :: ConsumerGroup -> TopicName -> Partition -> Offset -> OffsetCommitRequest Source #
brokerPartitionInfo :: Kafka m => TopicName -> m (Set PartitionAndLeader) Source #
Find a leader and partition for the topic.
findMetadataOrElse :: Kafka m => [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> m a Source #
protocolTime :: KafkaTime -> Time Source #
Convert an abstract time to a serializable protocol value.
updateMetadatas :: Kafka m => [TopicName] -> m () Source #
updateMetadata :: Kafka m => TopicName -> m () Source #
updateAllMetadata :: Kafka m => m () Source #
withBrokerHandle :: Kafka m => Broker -> (Handle -> m a) -> m a Source #
Execute a Kafka action with a Handle for the given Broker, updating
the connections cache if needed.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a Source #
Execute a Kafka action with a Handle for the given KafkaAddress,
updating the connections cache if needed.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
broker2address :: Broker -> KafkaAddress Source #
withAnyHandle :: Kafka m => (Handle -> m a) -> m a Source #
Like withAddressHandle, but round-robins the addresses in the KafkaState.
When the action throws an IOException, it is caught and returned as a
KafkaIOException in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOExceptions and exceptions thrown by
throwError from Except.
Offsets
data PartitionOffsetRequestInfo Source #
Fields to construct an offset request, per topic and partition.
Constructors
| PartitionOffsetRequestInfo | |
Fields
| |
getLastOffset :: Kafka m => KafkaTime -> Partition -> TopicName -> m Offset Source #
Get the first found offset.
getLastOffset' :: Kafka m => Handle -> KafkaTime -> Partition -> TopicName -> m Offset Source #
Get the first found offset.
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest Source #
Create an offset request.