LogoLogo
WarpStream.comSlackDiscordContact UsCreate Account
  • Overview
    • Introduction
    • Architecture
      • Service Discovery
      • Write Path
      • Read Path
      • Life of a Request (Simplified)
    • Change Log
  • Getting Started
    • Install the WarpStream Agent / CLI
    • Run the Demo
    • "Hello World" for Apache Kafka
  • BYOC
    • Run the Agents Locally
    • Deploy the Agents
      • Object Storage Configuration
      • Kubernetes Known Issues
      • Rolling Restarts and Upgrades
    • Infrastructure as Code
      • Terraform Provider
      • Helm charts
      • Terraform Modules
    • Monitoring
      • Pre-made Datadog Dashboard
      • Pre-made Grafana Dashboard
      • Important Metrics and Logs
      • Recommended List of Alerts
      • Monitoring Consumer Groups
      • Hosted Prometheus Endpoint
    • Client Configuration
      • Tuning for Performance
      • Configure Clients to Eliminate AZ Networking Costs
        • Force Interzone Load Balancing
      • Configuring Kafka Client ID Features
      • Known Issues
    • Authentication
      • SASL Authentication
      • Mutual TLS (mTLS)
      • Basic Authentication
    • Advanced Agent Deployment Options
      • Agent Roles
      • Agent Groups
      • Protect Data in Motion with TLS Encryption
      • Low Latency Clusters
      • Network Architecture Considerations
      • Agent Configuration Reference
      • Reducing Infrastructure Costs
      • Client Configuration Auto-tuning
    • Hosted Metadata Endpoint
    • Managed Data Pipelines
      • Cookbooks
    • Schema Registry
      • WarpStream BYOC Schema Registry
      • Schema Validation
      • WarpStream Schema Linking
    • Port Forwarding (K8s)
    • Orbit
    • Enable SAML Single Sign-on (SSO)
    • Trusted Domains
    • Diagnostics
      • GoMaxProcs
      • Small Files
  • Reference
    • ACLs
    • Billing
      • Direct billing
      • AWS Marketplace
    • Benchmarking
    • Compression
    • Protocol and Feature Support
      • Kafka vs WarpStream Configuration Reference
      • Compacted topics
    • Secrets Overview
    • Security and Privacy Considerations
    • API Reference
      • API Keys
        • Create
        • Delete
        • List
      • Virtual Clusters
        • Create
        • Delete
        • Describe
        • List
        • DescribeConfiguration
        • UpdateConfiguration
      • Virtual Clusters Credentials
        • Create
        • Delete
        • List
      • Monitoring
        • Describe All Consumer Groups
      • Pipelines
        • List Pipelines
        • Create Pipeline
        • Delete Pipeline
        • Describe Pipeline
        • Create Pipeline Configuration
        • Change Pipeline State
      • Invoices
        • Get Pending Invoice
        • Get Past Invoice
    • CLI Reference
      • warpstream agent
      • warpstream demo
      • warpstream cli
      • warpstream cli-beta
        • benchmark-consumer
        • benchmark-producer
        • console-consumer
        • console-producer
        • consumer-group-lag
        • diagnose-record
        • file-reader
        • file-scrubber
      • warpstream playground
    • Integrations
      • Arroyo
      • AWS Lambda Triggers
      • ClickHouse
      • Debezium
      • Decodable
      • DeltaStream
      • docker-compose
      • DuckDB
      • ElastiFlow
      • Estuary
      • Fly.io
      • Imply
      • InfluxDB
      • Kestra
      • Materialize
      • MinIO
      • MirrorMaker
      • MotherDuck
      • Ockam
      • OpenTelemetry Collector
      • ParadeDB
      • Parquet
      • Quix Streams
      • Railway
      • Redpanda Console
      • RisingWave
      • Rockset
      • ShadowTraffic
      • SQLite
      • Streambased
      • Streamlit
      • Timeplus
      • Tinybird
      • Upsolver
    • Partitions Auto-Scaler (beta)
    • Serverless Clusters
Powered by GitBook
On this page
  • Occasional Read/Write TCP or EOF errors in Sarama
  • Symptom
  • Context
  • Problem
  • Solution
  • librdkafka tries to connect to old agents
  • Symptom
  • Context
  • Problem
  • Solution
  • Leader epoch handling in librdkafka
  • Symptom
  • Context
  • Problem
  • Solution
  • Relevant material
  • Idempotence Performance in librdkafka
  • Disable Idempotence
  • Use NULL record keys
  • Tune WarpStream for Lower Latency
  • Change the WarpStream Partition Assignment Strategy
  • Use Multiple Clients

Was this helpful?

  1. BYOC
  2. Client Configuration

Known Issues

PreviousConfiguring Kafka Client ID FeaturesNextAuthentication

Last updated 2 months ago

Was this helpful?

Occasional Read/Write TCP or EOF errors in Sarama

Symptom

Client logs occasional errors that look like the following

read tcp 192.168.26.193:51587->192.168.26.193:9094: read: connection reset by peer
write tcp 192.168.26.193:53585->192.168.26.193:9093: write: broken pipe
got error from broker 1359066768 while fetching metadata: EOF

Context

Like Apache Kafka, WarpStream closes idle connections to free up resources so they can be used for other connections or processes.

When Sarama sets up it's Kafka client it keeps track of it's connection load on agents, when sending metadata requests Sarama chooses the least loaded agent.

When WarpStream responds to metadata requests it will return all the available agents but only make a single agent the leader, see for details.

In Sarama's view this causes all but one agent to have the least load. This means all connections except one will be idle a majority of the time.

Problem

The configuration to close idle connections on the WarpStream agents defaults to 1 hour.

After 1 hour Sarama may try and use a closed idle connection which will log these errors. However the presence of these errors should not impact the functionality of your application.

These errors are usually log spam due to how Sarama chooses the least loaded agent.

Solution

Increase the idle connection timeout on the WarpStream agents.

For example setting this environment variable and value WARPSTREAM_KAFKA_CLOSE_IDLE_CONN_AFTER=24hwill increase the idle connection timeout to 24 hours.

librdkafka tries to connect to old agents

Symptom

During and after a rolling restart of the WarpStream agents you will see error logs in your application that will look like the following:

10.42.0.144:9092/356356332: Failed to connect to broker at 10.42.0.144:9092: Operation timed out
10.244.1.36:9092/393450102: Connect to ipv4#10.244.1.36:9092 failed: No route to host (after 14302ms in state CONNECT)
10.0.232.176:9092/249600149: Connection setup timed out in state CONNECT (after 30022ms in state CONNECT, 1 identical error(s) suppressed)

Context

When running in a container platform like Kubernetes or ECS performing a rolling restart of a deployment will change the IP Addresses that the WarpStream Agents are using.

Problem

Client does not remove old agent IP Addresses from its internal configuration which leads to error logs about not able to connect to IP Addresses of old agents.

The client should continue to function as normal but will emit errors about not being able to connect to old WapStream agents.

Solution

The application can be restarted to stop the logs, however the next time the WarpStream Agents are restarted these logs will come back.

Leader epoch handling in librdkafka

Symptom

Client does not try to open a connection with a new agent, especially during rolling restarts.

Context

Leader epoch is a monotonically increasing number representing a continuous period of leadership for a single partition in Kafka. Changes in leader epoch signals leader transition. In WarpStream the concept of a partition leader does not exist since any WarpStream agent can handle produce and consume requests for any topic and partition. As such WarpStream returns a leader epoch of 0 in all the responses that require a leader epoch.

Problem

if (rktp->rktp_leader_epoch == -1 || leader_epoch > rktp->rktp_leader_epoch)

This means that if there is an agent with IP ip1 and we would like to replace it with an agent with ip2, librdkafka will not open a connection against ip2.

Solution

There are a couple options:

  • Upgrade to librdkafka 2.8

  • Downgrade to librdkafka 2.3

  • Set warpstream_strict_leader_epoch=true or ws_sle=true in your Kafka client ID

  • Contact us to enable a patch in the WarpStream control plane

Relevant material

Related librdkafka issues

librdkafka fix

Leader epoch KIPs

Idempotence Performance in librdkafka

  • WarpStream has higher produce latency than traditional Apache Kafka

  • Librdkafka only allows 5 concurrent produce requests per connection when the idempotent producer functionality is enabled instead of per partition

The easiest ways to mitigate this problem are to:

However if neither of those are valid options, then we'd recommend:

to start as those two changes are the easiest to implement and also have the most minor trade-offs (no impact on application correctness or behavior).

Many of these mitigation strategies are multiplicative with each other.

For example, if you tune WarpStream for lower latency (reducing the average Produce latency by 3x), then change the WarpStream partition assignment strategy to equal_spread and run with 6 Agents available to each client, then you should see an approximately 3 * 6 = 12x increase in the maximum potential throughput.

Disable Idempotence

This is the easiest solution, but may not be viable if your application depends on idempotence.

Use NULL record keys

Note that you may want to consider increasing the value of sticky.partitioning.linger.ms to a higher value like 100ms if you take this approach.

Tune WarpStream for Lower Latency

One of the reasons that the idempotent producer functionality in librdkafka is slow is because WarpStream has higher Produce latency than traditional Apache Kafka. Since Librdkafka will only allow 5 concurrent idempotent producer requests at a time, ensuring that each of those Produce requests completes faster will enable higher total throughput.

Change the WarpStream Partition Assignment Strategy

WarpStream has no real concept of partition assignments or rebalancing because WarpStream Agents aren’t leaders for individual topic-partitions and don’t store any state. It’s actually the WarpStream control plane that is the “leader” for the entire cluster and responsible for sequencing batches of data within all topic-partitions.

However, WarpStream still has to inform clients which Agents are the “leaders” for specific topic-partitions so that the clients know which Agents to route their Produce and Fetch requests to. That's just how the Kafka protocol works. Therefore in WarpStream, partition assignment strategies mostly control how RPCs are routed between clients and Agents, and have nothing to do with actual data placement.

The default WarpStream partition assignment strategy tells each client that a single Agent is the leader for all topic-partitions in the system. Which Agent each client observes as the leader for all topic-partitions is determined based on the observed load of all the Agents when that client issues a Metadata request. This means that different clients will view different Agents as the leader, which enables WarpStream to balance the load evenly across all the Agents.

This is the default assignment strategy because it allows the most flexibility for load-balancing, and because it results in the lowest overall Produce latency when clients Produce to multiple topic-partitions and wait for acknowledgement from all of them.

However, this partition assignment strategy interacts poorly with the implementation of idempotent producer in librdkafka because librdkafka will only allow 5 concurrent Produce requests per Agent connection instead of per-partition as its supposed to. Combine this with the fact that librdkafka only allows a single batch from a single topic-partition per Produce request, and this means that when idempotency is enabled with WarpStream's default settings only 5 topic-partitions can be written to concurrently.

The one we recommend for use with idempotent librdkafka producer: ws_pas=equal_spread . This will make WarpStream look more “Kafka-like” by telling clients that the partitions for each topic they’re producing to are evenly distributed amongst the WarpStream Agents. This way, instead of connecting to only a single Agent, librdkafka will connect to all the Agents.

This increases the maximum number of concurrent idempotent Produce requests that librdkafka can make with WarpStream from 5 to 5 * num_agents. This means that once this feature is enabled, you should be able to increase the throughput of your producers by increasing the number of Agents they can connect to as well. Keep in mind that this feature still takes Agent availability zones and roles into account as well.

For example, if you have 3 Agents deployed across 3 availability zones, but don't have zone-aware routing configured, then librdkafka will connect to all 3 Agents. However, if zone-aware routing is enabled, then librdkafka will only connect to the single Agent available in the same zone as it. As a result, in a zone-aware setup, the relevant number of Agents for throughput is the number of Agents running in the same zone as a particular client.

Use Multiple Clients

This is the most difficult approach and should only be considered once all other options have been exhausted.

The librdkafka limitations are all specific to running a single librdkafka instance. However, you can run multiple instances of the librdkafka client in your application and load-balance records between them. For example, you could create 4 instances of the librdkafka library in your application, spread records amongst them, and then the total number of concurrent Produce requests your application could make would increase by 4.

However, Librdkafka Produce requests can only contain data for a single topic-partition so to make this approach effective you would have to distribute records to the different clients such that each client was only producing to a subset of the overall partitions. For example, something like this:

num_clients = 4
for records range records:
	expected_partition = determine_expected_partition(record, topic)
	client = clients[expected_partition % num_clients].produceAsync(record)

The key function is determine_expected_partition which would need to be constructed to match exactly the behavior of whichever librdkafka partitioning strategy you're currently using. This would involve making a Kafka Metadata request to determine the number of partitions in a topic (and caching that result for some period of time), and then hashing / partitioning the record keys using the same algorithm that librdkafka is configured to use.

At this time there is no permanent solution. As of Dec 12, 2023 there is an open pull request to solve this issue . Once this pull request is merged we will update this solution with the recommended librdkafka version to upgrade to.

librdkafka 2.4 introduced a stricter check such that metadata update is only considered if leader epoch is monotonically increasing ().

Enabling the idempotent producer functionality in the librdkafka client library can result in extremely poor producer throughput and very high latency even when using our . This is the result of four conspiring factors:

is implemented such that each client believes a single Agent is the leader for all topic-partitions at any given moment

Librdkafka never combines batches from multiple partitions owned by the same broker into a single Produce request:

in the librdkafka client or

when producing

and

This approach is not always viable if you need records with the same key to always be routed to the same topic-partition. However, if your application doesn't depend on that, then the easiest way to achieve high throughput with idempotent Produce requests in librdkafka is to avoid specifying keys for your records at all. If you do this while also using our for librdkafka, then you should be able to achieve high throughput even with idempotence enabled.

Follow .

TLDR; try adding ws_pas=equal_spread to your idempotent librdkafka producer's client ID as and then increase the number of Agents that are available to each of your clients based on whether you have zone-aware and/or role-aware routing enabled.

As a result, when idempotency is enabled in librdkafka Producers, it may be worth switching the WarpStream partition assignment strategy for those clients to a different one. Since partition assignment strategies in WarpStream are almost purely a client routing concern, and have nothing to do with actual data placement, partition assignment strategies can be configured at the individual client level using a client .

In summary, try adding ws_pas=equal_spread to your idempotent librdkafka producer's client ID as and then increase the number of Agents that are available to each of your clients based on whether you have zone-aware and/or role-aware routing enabled.

https://github.com/confluentinc/librdkafka/pull/4557
PR
https://github.com/confluentinc/librdkafka/issues/4796
https://github.com/confluentinc/librdkafka/issues/4804
https://github.com/confluentinc/librdkafka/pull/4901
https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation#KIP320:Allowfetcherstodetectandhandlelogtruncation-APIChanges
WarpStream's service discovery mechanism
https://github.com/confluentinc/librdkafka/issues/1700
Tuning WarpStream for Lower Latency
Changing the WarpStream Partition Assignment Strategy
these docs for tuning WarpStream for lower latency
Disable idempotence
Use NULL record keys
Partition Assignment
described here
ID feature
described here
recommended settings
recommended settings