milena- A Kafka client for Haskell.

Safe HaskellNone





data KafkaState Source




_stateName :: KafkaString

Name to use as a client ID.

_stateRequiredAcks :: RequiredAcks

How many acknowledgements are required for producing.

_stateRequestTimeout :: Timeout

Time in milliseconds to wait for messages to be produced by broker.

_stateWaitSize :: MinBytes

Minimum size of response bytes to block for.

_stateBufferSize :: MaxBytes

Maximum size of response bytes to retrieve.

_stateWaitTime :: MaxWaitTime

Maximum time in milliseconds to wait for response.

_stateCorrelationId :: CorrelationId

An incrementing counter of requests.

_stateBrokers :: Map Leader Broker

Broker cache

_stateConnections :: Map KafkaAddress (Pool Handle)

Connection cache

_stateTopicMetadata :: Map TopicName TopicMetadata

Topic metadata cache

_stateAddresses :: NonEmpty KafkaAddress

Address cache

type Kafka = StateT KafkaState (ExceptT KafkaClientError IO) Source

The core Kafka monad.

data KafkaClientError Source

Errors given from the Kafka monad.



A response did not contain an offset.

KafkaExpected KafkaExpectedResponse

Got a different form of a response than was requested.

KafkaDeserializationError String

A value could not be deserialized correctly.

KafkaInvalidBroker Leader

Could not find a cached broker for the found leader.

KafkaIOException IOException 

data KafkaTime Source

An abstract form of Kafka's time. Used for querying offsets.



The latest time on the broker.


The earliest time on the broker.

OtherTime Time

A specific time.

data TopicAndMessage Source

A topic with a serializable message.

tamPayload :: TopicAndMessage -> ByteString Source

Get the bytes from the Kafka message, ignoring the topic.


defaultMaxBytes :: MaxBytes Source

Default: 1024 * 1024

mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState Source

Create a consumer using default values.

runKafka :: KafkaState -> Kafka a -> IO (Either KafkaClientError a) Source

Run the underlying Kafka monad.

makeRequest :: RequestMessage -> Kafka Request Source

Make a request, incrementing the _stateCorrelationId.

tryKafkaIO :: IO a -> Kafka a Source

Catch IOExceptions and wrap them in KafkaIOExceptions.

tryKafka :: Kafka a -> Kafka a Source

Catch IOExceptions and wrap them in KafkaIOExceptions.

expect :: KafkaClientError -> (a -> Maybe b) -> a -> Kafka b Source

brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader] Source

Find a leader and partition for the topic.

expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b Source

Function to give an error when the response seems wrong.

protocolTime :: KafkaTime -> Time Source

Convert an abstract time to a serializable protocol value.


fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest Source

Construct a fetch request from the values in the state.

fetch' :: Handle -> FetchRequest -> Kafka FetchResponse Source

Execute a fetch request and get the raw fetch response.

fetch :: FetchRequest -> Kafka FetchResponse Source

Execute a fetch request and get the raw fetch response. Round-robins the requests to addresses in the KafkaState.

fetchMessages :: FetchResponse -> [TopicAndMessage] Source

Extract out messages with their topics from a fetch response.

withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a Source

Execute a Kafka action with a Handle for the given Broker, updating the connections cache if needed.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

withAddressHandle :: KafkaAddress -> (Handle -> Kafka a) -> Kafka a Source

Execute a Kafka action with a Handle for the given KafkaAddress, updating the connections cache if needed.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

withAnyHandle :: (Handle -> Kafka a) -> Kafka a Source

Like withAddressHandle, but round-robins the addresses in the KafkaState.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.


data PartitionOffsetRequestInfo Source

Fields to construct an offset request, per topic and partition.




_kafkaTime :: KafkaTime

Time to find an offset for.

_maxNumOffsets :: MaxNumberOfOffsets

Number of offsets to retrieve.

getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset Source

Get the first found offset.

getLastOffset' :: Handle -> KafkaTime -> Partition -> TopicName -> Kafka Offset Source

Get the first found offset.