# Known Issues

## Sarama Client

### Invalid Array Length when consuming

#### Symptom

Client logs with errors that look like the following

{% code overflow="wrap" %}

```
kafka: error while consuming my-topic/3: kafka: error decoding packet: invalid array length
```

{% endcode %}

#### Context

When trying to consume records from a topic where the message size is very small, i.e 300 bytes or less the Kafka Fetch response from WarpStream can contain a lot more records then would normally be returned by Apache Kafka.

This happens because WarpStream re-writes record batches to more efficiently store and retrieve them from object storage.

#### Problem

Before Sarama v1.4.5 it had a hard coded limit of `2*math.MaxUint16` (131070) number of records that could be processed in a single Fetch response.

This caused any Fetch responses that contained more then 131070 records to be rejected by Sarama with the above error message.

#### Solution

Update to Sarama v1.4.5 which contains this fix <https://github.com/IBM/sarama/pull/3120> to drastically increase the number of records that Sarama can process in a single Fetch response to 104857600.\
\
If needed you can increase this limit even more by setting the `MaxResponseSize` variable found [here](https://github.com/IBM/sarama/blob/d4acbec36f8e76dc731d12b108a84ed0cd3b1e3b/sarama.go#L113) before initializing your Sarama client. In our experience this limit will not need to be increased from it's current value.

### Occasional Read/Write TCP or EOF errors

#### Symptom

Client logs occasional errors that look like the following

```
read tcp 192.168.26.193:51587->192.168.26.193:9094: read: connection reset by peer
write tcp 192.168.26.193:53585->192.168.26.193:9093: write: broken pipe
got error from broker 1359066768 while fetching metadata: EOF
```

#### Context

Like Apache Kafka, WarpStream closes idle connections to free up resources so they can be used for other connections or processes.

When Sarama sets up it's Kafka client it keeps track of it's connection load on agents, when sending metadata requests Sarama chooses the least loaded agent.

When WarpStream responds to metadata requests it will return all the available agents but only make a single agent the leader, see [#partition-assignment](https://docs.warpstream.com/warpstream/overview/architecture/service-discovery#partition-assignment "mention") for details.

In Sarama's view this causes all but one agent to have the least load. This means all connections except one will be idle a majority of the time.

#### Problem

The configuration to close idle connections on the WarpStream agents defaults to 1 hour.

After 1 hour Sarama may try and use a closed idle connection which will log these errors. However the presence of these errors should not impact the functionality of your application.

These errors are usually log spam due to how Sarama chooses the least loaded agent.

#### Solution

Increase the idle connection timeout on the WarpStream agents.

For example setting this environment variable and value `WARPSTREAM_KAFKA_CLOSE_IDLE_CONN_AFTER=24h`will increase the idle connection timeout to 24 hours.

## **librdkafka Client**

### **Tries to connect to old agents**

#### Symptom

During and after a rolling restart of the WarpStream agents you will see error logs in your application that will look like the following:

```
10.42.0.144:9092/356356332: Failed to connect to broker at 10.42.0.144:9092: Operation timed out
10.244.1.36:9092/393450102: Connect to ipv4#10.244.1.36:9092 failed: No route to host (after 14302ms in state CONNECT)
10.0.232.176:9092/249600149: Connection setup timed out in state CONNECT (after 30022ms in state CONNECT, 1 identical error(s) suppressed)
```

#### Context

When running in a container platform like Kubernetes or ECS performing a rolling restart of a deployment will change the IP Addresses that the WarpStream Agents are using.

#### Problem

Client does not remove old agent IP Addresses from its internal configuration which leads to error logs about not able to connect to IP Addresses of old agents.

The client should continue to function as normal but will emit errors about not being able to connect to old WarpStream agents.

#### Solution

Upgrade to version [2.10.0](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0) or newer. This issue has been fixed in the following pull request <https://github.com/confluentinc/librdkafka/pull/4557>.

### **Bad leader epoch handling**

#### Symptom

Client does not try to open a connection with a new agent, especially during rolling restarts.

#### Context

Leader epoch is a monotonically increasing number representing a continuous period of leadership for a single partition in Kafka. Changes in leader epoch signals leader transition. In WarpStream the concept of a partition leader does not exist since any WarpStream agent can handle produce and consume requests for any topic and partition. As such WarpStream returns a leader epoch of 0 in all the responses that require a leader epoch.

#### Problem

`librdkafka` 2.4 introduced a stricter check such that metadata update is only considered if leader epoch is monotonically increasing ([PR](https://github.com/confluentinc/librdkafka/pull/4680)).

`if (rktp->rktp_leader_epoch == -1 || leader_epoch > rktp->rktp_leader_epoch)`

This means that if there is an agent with IP `ip1` and we would like to replace it with an agent with `ip2`, `librdkafka` will not open a connection against `ip2`.

#### Solution

There are a couple options:

* Upgrade to `librdkafka` 2.8
* Downgrade to `librdkafka` 2.3
* Set `warpstream_strict_leader_epoch=true` or `ws_sle=true` in your Kafka client ID.
  * Note that WarpStream Kafka client ID features are expected to be comma-delimited, so if your existing client ID is `some_client_id` or `some_client_id,ws_az=us-east-1a` then the client ID should be changed to `some_client_id,ws_sle=true` or `some_client_id,ws_az=us-east-1a,ws_sle=true` respectively.
* Contact us to enable a patch in the WarpStream control plane

#### Relevant material

Related `librdkafka` issues

* <https://github.com/confluentinc/librdkafka/issues/4796>
* <https://github.com/confluentinc/librdkafka/issues/4804>

`librdkafka` fix

* <https://github.com/confluentinc/librdkafka/pull/4901>

Leader epoch KIPs

* <https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation>
* <https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation#KIP320:Allowfetcherstodetectandhandlelogtruncation-APIChanges>

### Idempotence Performance

Enabling the idempotent producer functionality in the librdkafka client library can result in extremely poor producer throughput and very high latency even when using our [recommended settings](https://docs.warpstream.com/warpstream/kafka/tuning-for-performance#librdkafka). This is the result of four conspiring factors:

* WarpStream has higher produce latency than traditional Apache Kafka
* [WarpStream's service discovery mechanism](https://www.warpstream.com/blog/hacking-the-kafka-protocol) is implemented such that each client believes a single Agent is the leader for all topic-partitions at any given moment
* Librdkafka only allows 5 concurrent produce requests per connection when the idempotent producer functionality is enabled instead of per partition
* Librdkafka never combines batches from multiple partitions owned by the same broker into a single Produce request:[ https://github.com/confluentinc/librdkafka/issues/1700](https://github.com/confluentinc/librdkafka/issues/1700)

The easiest ways to mitigate this problem are to:

1. [Disable idempotence](#disable-idempotence) in the librdkafka client or
2. [Use NULL record keys](#use-null-record-keys) when producing

However if neither of those are valid options, then we'd recommend:

1. [Tuning WarpStream for Lower Latency](https://docs.warpstream.com/warpstream/kafka/advanced-agent-deployment-options/low-latency-clusters) and
2. [Changing the WarpStream Partition Assignment Strategy](https://docs.warpstream.com/warpstream/kafka/configuring-kafka-client-id-features#warpstream_partition_assignment_strategy)

to start as those two changes are the easiest to implement and also have the most minor trade-offs (no impact on application correctness or behavior).

{% hint style="info" %}
Many of these mitigation strategies are multiplicative with each other.

For example, if you tune WarpStream for lower latency (reducing the average Produce latency by 3x), then change the WarpStream partition assignment strategy to equal\_spread and run with 6 Agents available to each client, then you should see an approximately `3 * 6 = 18x` increase in the maximum potential throughput.
{% endhint %}

#### Disable Idempotence

This is the easiest solution, but may not be viable if your application depends on idempotence.

#### Use NULL record keys

This approach is not always viable if you need records with the same key to always be routed to the same topic-partition. However, if your application doesn't depend on that, then the easiest way to achieve high throughput with idempotent Produce requests in librdkafka is to avoid specifying keys for your records at all. If you do this while also using our [recommended settings](https://docs.warpstream.com/warpstream/kafka/tuning-for-performance#librdkafka) for librdkafka, then you should be able to achieve high throughput even with idempotence enabled.

Note that you may want to consider increasing the value of `sticky.partitioning.linger.ms` to a higher value like `100ms` if you take this approach.

#### Tune WarpStream for Lower Latency

One of the reasons that the idempotent producer functionality in librdkafka is slow is because WarpStream has higher Produce latency than traditional Apache Kafka. Since Librdkafka will only allow 5 concurrent idempotent producer requests at a time, ensuring that each of those Produce requests completes faster will enable higher total throughput.

Follow [these docs for tuning WarpStream for lower latency](https://docs.warpstream.com/warpstream/kafka/advanced-agent-deployment-options/low-latency-clusters).

#### Use Multiple Clients

{% hint style="warning" %}
This is the most difficult approach and should only be considered once all other options have been exhausted.
{% endhint %}

The librdkafka limitations are all specific to running a single librdkafka instance. However, you can run multiple instances of the librdkafka client in your application and load-balance records between them. For example, you could create 4 instances of the librdkafka library in your application, spread records amongst them, and then the total number of concurrent Produce requests your application could make would increase by 4.

However, Librdkafka Produce requests can only contain data for a single topic-partition so to make this approach effective you would have to distribute records to the different clients such that each client was only producing to a subset of the overall partitions. For example, something like this:

```python
num_clients = 4
for records range records:
	expected_partition = determine_expected_partition(record, topic)
	client = clients[expected_partition % num_clients].produceAsync(record)
```

The key function is `determine_expected_partition` which would need to be constructed to match exactly the behavior of whichever librdkafka partitioning strategy you're currently using. This would involve making a Kafka Metadata request to determine the number of partitions in a topic (and caching that result for some period of time), and then hashing / partitioning the record keys using the same algorithm that librdkafka is configured to use.

## Java Client

### Got error produce response with correlation id XXXX on topic-partition YYYY, retrying. Error: Kafka Storage Error.

In some instances you may observe your producer clients log error messages like the following:

{% code overflow="wrap" %}

```log
[Producer clientId=warpstream_az=us-east-1a] Got error produce response with correlation id 46572 on topic-partition test-topic-0000000-K6WhR0c-0, retrying (2147483646 attempts left). Error: KAFKA_STORAGE_ERROR
```

{% endcode %}

The producer error logs may not correlate with any error logs in the Agents.

This is normal and is caused by the interaction between the idempotent producer feature and WarpStream's load balancing system. The error logs are harmless and can be safely ignored as WarpStream returns a retriable error in this case and the Java client will automatically produce the records again with no data loss.

That said, if this is happening a lot, it may lead to increased producer latency. There are two ways to resolve the issue besides just ignoring the error logs:

1. Disable idempotent producer by setting `enable.idempotence=false` on your producer client.
2. Change the partitioning strategy on the WarpStream Agents from the default of `single_agent` to `consistent_random_jump` by setting the environment variable `WARPSTREAM_DEFAULT_PARTITION_ASSIGNMENT_STRATEGY=consistent_random_jump`.
   1. This approach will not eliminate the error logs entirely, but it should make them significantly more rare as WarpStream will try to keep topic-partitions assigned to the same Agents as much as possible, and will only shift them from one Agent to another when absolutely necessary for load-balancing purposes.

#### Why it happens

The idempotent producer functionality in Kafka enables a Kafka client to have up to 5 outstanding concurrent produce requests per broker while still maintaining strict total ordering.

Unlike Apache Kafka, any WarpStream Agent process produce requests for any topic-partition. As a result of this, WarpStream's partition assignment strategies will shift topic-partitions "ownership" (from the clients perspective) around the cluster quite often to keep the WarpStream Agents evenly balanced. This means that from the client's perspective, the leader of any individual topic-partition shifts much more frequently in WarpStream than it does in Apache Kafka, sometimes as frequently as once a minute.

When the topic-partition leader changes, the client will stop producing to one Agent and start producing to another Agent. When this happens, there is a small chance that the idempotent producer functionality in WarpStream will detect that the Agents tried to commit some of the client's batches in the wrong order because batch 2 was sent to Agent A and batch 3 was sent to Agent B, but Agent B committed a file before Agent A did. When this happens, WarpStream returns a KAFKA\_STORAGE\_ERROR (retriable error code) back to the client so that the client knows to resend batch 3 again now that batch 2 has been committed.

That's why disabling the idempotent producer functionality **or** changing WarpStream's partition assignment strategy to a more "sticky" algorithm like `consistent_random_jump` will resolve the issue.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.warpstream.com/warpstream/kafka/configure-kafka-client/known-issues.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
