Known Issues
Last updated
Was this helpful?
Last updated
Was this helpful?
Client logs occasional errors that look like the following
Like Apache Kafka, WarpStream closes idle connections to free up resources so they can be used for other connections or processes.
When Sarama sets up it's Kafka client it keeps track of it's connection load on agents, when sending metadata requests Sarama chooses the least loaded agent.
When WarpStream responds to metadata requests it will return all the available agents but only make a single agent the leader, see for details.
In Sarama's view this causes all but one agent to have the least load. This means all connections except one will be idle a majority of the time.
The configuration to close idle connections on the WarpStream agents defaults to 1 hour.
After 1 hour Sarama may try and use a closed idle connection which will log these errors. However the presence of these errors should not impact the functionality of your application.
These errors are usually log spam due to how Sarama chooses the least loaded agent.
Increase the idle connection timeout on the WarpStream agents.
For example setting this environment variable and value WARPSTREAM_KAFKA_CLOSE_IDLE_CONN_AFTER=24h
will increase the idle connection timeout to 24 hours.
During and after a rolling restart of the WarpStream agents you will see error logs in your application that will look like the following:
When running in a container platform like Kubernetes or ECS performing a rolling restart of a deployment will change the IP Addresses that the WarpStream Agents are using.
Client does not remove old agent IP Addresses from its internal configuration which leads to error logs about not able to connect to IP Addresses of old agents.
The client should continue to function as normal but will emit errors about not being able to connect to old WapStream agents.
At this time there is no permanent solution. As of Dec 12, 2023 there is an open pull request to solve this issue https://github.com/confluentinc/librdkafka/pull/4557. Once this pull request is merged we will update this solution with the recommended librdkafka version to upgrade to.
The application can be restarted to stop the logs, however the next time the WarpStream Agents are restarted these logs will come back.
Client does not try to open a connection with a new agent, especially during rolling restarts.
Leader epoch is a monotonically increasing number representing a continuous period of leadership for a single partition in Kafka. Changes in leader epoch signals leader transition. In WarpStream the concept of a partition leader does not exist since any WarpStream agent can handle produce and consume requests for any topic and partition. As such WarpStream returns a leader epoch of 0 in all the responses that require a leader epoch.
librdkafka
2.4 introduced a stricter check such that metadata update is only considered if leader epoch is monotonically increasing (PR).
if (rktp->rktp_leader_epoch == -1 || leader_epoch > rktp->rktp_leader_epoch)
This means that if there is an agent with IP ip1
and we would like to replace it with an agent with ip2
, librdkafka
will not open a connection against ip2
.
There are a couple options:
Upgrade to librdkafka
2.8
Downgrade to librdkafka
2.3
Set warpstream_strict_leader_epoch=true
or ws_sle=true
in your Kafka client ID
Contact us to enable a patch in the WarpStream control plane
Related librdkafka
issues
librdkafka
fix
Leader epoch KIPs
Enabling the idempotent producer functionality in the librdkafka client library can result in extremely poor producer throughput and very high latency even when using our recommended settings. This is the result of four conspiring factors:
WarpStream has higher produce latency than traditional Apache Kafka
WarpStream's service discovery mechanism is implemented such that each client believes a single Agent is the leader for all topic-partitions at any given moment
Librdkafka only allows 5 concurrent produce requests per connection when the idempotent producer functionality is enabled instead of per partition
Librdkafka never combines batches from multiple partitions owned by the same broker into a single Produce request: https://github.com/confluentinc/librdkafka/issues/1700
The easiest ways to mitigate this problem are to:
Disable idempotence in the librdkafka client or
Use NULL record keys when producing
However if neither of those are valid options, then we'd recommend:
to start as those two changes are the easiest to implement and also have the most minor trade-offs (no impact on application correctness or behavior).
Many of these mitigation strategies are multiplicative with each other.
For example, if you tune WarpStream for lower latency (reducing the average Produce latency by 3x), then change the WarpStream partition assignment strategy to equal_spread and run with 6 Agents available to each client, then you should see an approximately 3 * 6 = 12x
increase in the maximum potential throughput.
This is the easiest solution, but may not be viable if your application depends on idempotence.
This approach is not always viable if you need records with the same key to always be routed to the same topic-partition. However, if your application doesn't depend on that, then the easiest way to achieve high throughput with idempotent Produce requests in librdkafka is to avoid specifying keys for your records at all. If you do this while also using our recommended settings for librdkafka, then you should be able to achieve high throughput even with idempotence enabled.
Note that you may want to consider increasing the value of sticky.partitioning.linger.ms
to a higher value like 100ms
if you take this approach.
One of the reasons that the idempotent producer functionality in librdkafka is slow is because WarpStream has higher Produce latency than traditional Apache Kafka. Since Librdkafka will only allow 5 concurrent idempotent producer requests at a time, ensuring that each of those Produce requests completes faster will enable higher total throughput.
Follow these docs for tuning WarpStream for lower latency.
TLDR; try adding ws_pas=equal_spread
to your idempotent librdkafka producer's client ID as described here and then increase the number of Agents that are available to each of your clients based on whether you have zone-aware and/or role-aware routing enabled.
WarpStream has no real concept of partition assignments or rebalancing because WarpStream Agents aren’t leaders for individual topic-partitions and don’t store any state. It’s actually the WarpStream control plane that is the “leader” for the entire cluster and responsible for sequencing batches of data within all topic-partitions.
However, WarpStream still has to inform clients which Agents are the “leaders” for specific topic-partitions so that the clients know which Agents to route their Produce and Fetch requests to. That's just how the Kafka protocol works. Therefore in WarpStream, partition assignment strategies mostly control how RPCs are routed between clients and Agents, and have nothing to do with actual data placement.
The default WarpStream partition assignment strategy tells each client that a single Agent is the leader for all topic-partitions in the system. Which Agent each client observes as the leader for all topic-partitions is determined based on the observed load of all the Agents when that client issues a Metadata request. This means that different clients will view different Agents as the leader, which enables WarpStream to balance the load evenly across all the Agents.
This is the default assignment strategy because it allows the most flexibility for load-balancing, and because it results in the lowest overall Produce latency when clients Produce to multiple topic-partitions and wait for acknowledgement from all of them.
However, this partition assignment strategy interacts poorly with the implementation of idempotent producer in librdkafka because librdkafka will only allow 5 concurrent Produce requests per Agent connection instead of per-partition as its supposed to. Combine this with the fact that librdkafka only allows a single batch from a single topic-partition per Produce request, and this means that when idempotency is enabled with WarpStream's default settings only 5 topic-partitions can be written to concurrently.
As a result, when idempotency is enabled in librdkafka Producers, it may be worth switching the WarpStream partition assignment strategy for those clients to a different one. Since partition assignment strategies in WarpStream are almost purely a client routing concern, and have nothing to do with actual data placement, partition assignment strategies can be configured at the individual client level using a client ID feature.
The one we recommend for use with idempotent librdkafka producer: ws_pas=equal_spread
. This will make WarpStream look more “Kafka-like” by telling clients that the partitions for each topic they’re producing to are evenly distributed amongst the WarpStream Agents. This way, instead of connecting to only a single Agent, librdkafka will connect to all the Agents.
This increases the maximum number of concurrent idempotent Produce requests that librdkafka can make with WarpStream from 5 to 5 * num_agents. This means that once this feature is enabled, you should be able to increase the throughput of your producers by increasing the number of Agents they can connect to as well. Keep in mind that this feature still takes Agent availability zones and roles into account as well.
For example, if you have 3 Agents deployed across 3 availability zones, but don't have zone-aware routing configured, then librdkafka will connect to all 3 Agents. However, if zone-aware routing is enabled, then librdkafka will only connect to the single Agent available in the same zone as it. As a result, in a zone-aware setup, the relevant number of Agents for throughput is the number of Agents running in the same zone as a particular client.
In summary, try adding ws_pas=equal_spread
to your idempotent librdkafka producer's client ID as described here and then increase the number of Agents that are available to each of your clients based on whether you have zone-aware and/or role-aware routing enabled.
This is the most difficult approach and should only be considered once all other options have been exhausted.
The librdkafka limitations are all specific to running a single librdkafka instance. However, you can run multiple instances of the librdkafka client in your application and load-balance records between them. For example, you could create 4 instances of the librdkafka library in your application, spread records amongst them, and then the total number of concurrent Produce requests your application could make would increase by 4.
However, Librdkafka Produce requests can only contain data for a single topic-partition so to make this approach effective you would have to distribute records to the different clients such that each client was only producing to a subset of the overall partitions. For example, something like this:
The key function is determine_expected_partition
which would need to be constructed to match exactly the behavior of whichever librdkafka partitioning strategy you're currently using. This would involve making a Kafka Metadata request to determine the number of partitions in a topic (and caching that result for some period of time), and then hashing / partitioning the record keys using the same algorithm that librdkafka is configured to use.