Tuning for Performance

Instructions on how to tune various Kafka clients for performance with WarpStream.

Write / Produce Throughput

WarpStream Agents have no local disks and write data directly to object storage. Creating files in object storage is a relatively high latency operation compared to writing to a local disk. For example, in our experience, the P99 latency for writing a 4 MiB file to S3 is ~ 400ms.

As a result of this property, WarpStream Agents will only reach their maximum performance with highly concurrent workloads or workloads that write large batches. This is a practical consideration that derives mathematically from the ratio of system throughput to system latency (known in queuing theory as Little’s Law).

An individual WarpStream Agent cluster sustain > 1GiB/s of write throughput, but to accomplish that with write latencies of 400ms, there must be a high degree of concurrency. For example, a write rate of 1GiB/s can be achieved with 1MiB batch sizes if there are 400 concurrent produce requests at any given moment.

Not having enough outstanding requests is the single biggest reason for low write thoughput when using WarpStream.

If the WarpStream Agent's aren't running at close at 80-90% CPU utilization, then you can almost certainly derive more throughput from them.

We currently have documentation and recommended settings on how to achieve high write throughput with the following clients:

The documentation on this page is all focused on how to achieve as much write throughput as possible via a single "instance" of each Kafka client.

However, if you're still struggling to drive the amount of write throughput you want from your application and the WarpStream Agents CPU utilization isn't very high, then you can always create more "instances" of the Kafka client in your application as well.


Librdkafka

Consumer SettingsRecommended Value

topic.metadata.refresh.interval.ms

60000

fetch.max.bytes

104857600

max.partition.fetch.bytes

52428800

Producer SettingsRecommended Value

topic.metadata.refresh.interval.ms

60000

queue.buffering.max.kbytes

1048576

queue.buffering.max.messages

1000000

message.max.bytes

64000000

batch.size

16000000

batch.num.messages

100000

linger.ms

100

sticky.partitioning.linger.ms

25

enable.idempotence

false

max.in.flight.requests.per.connection

1000000

partitioner

consistent_random

For both consumers and producers, the metadata refresh interval in librdkafka is configured to a more frequent 1 minute, compared to its default setting of 5 minutes. This adjustment enhances the client's responsiveness to changes in the cluster, ensuring efficient load balancing, especially when utilizing the default warpstream_partition_assignment_strategy in client ID features Configuring Kafka Client ID Features.

The producer settings above should result in good write throughput performance, regardless of your key distribution or partitioning scheme. However, performance will generally be improved when using NULL keys and not specifying which partition individual records should be written to. This enables the consistent_random partitioner to use the "sticky partitioning linger" functionality to produce optimally sized batches which improves compression ratios over the wire and reduces overhead both in the Kafka client and the agent.

msg := &kafka.Message{
	TopicPartition: kafka.TopicPartition{Topic: &topicName},
	Key:            nil,
	Value:          record.Value,
}
err := producer.Produce(msg, events)
if err != nil {
	return nil, fmt.Errorf("error producing record: %w", err)
}

However, as long as the idempotent producer functionality is disabled, this is not strictly required for achieving good throughput.

Another thing to keep in mind when using Librdkafka is that it is one of the few Kafka libraries that never combines batches for multiple partitions owned by the same broker intto a single Produce request: https://github.com/confluentinc/librdkafka/issues/1700

That's why we recommend a high value for max.in.flight.requests.per.connection, especially when writing to a high number of partitions.

A note on idempotence

Enabling the idempotent producer functionality in the librdkafka client library in conjunction with WarpStream can result in extremely poor producer throughput and very high latency. This is the result of three conspiring factors:

  1. WarpStream has higher produce latency than traditional Apache Kafka

  2. WarpStream's service discovery mechanism is implemented such that each client believes a single Agent is the leader for all topic-partitions at any given moment

  3. Librdkafka only allows 5 concurrent produce requests per connection when the idempotent producer functionality is enabled instead of per partition

  4. Librdkafka never combines batches from multiple partitions owned by the same broker into a single Produce request: https://github.com/confluentinc/librdkafka/issues/1700

As a result of all this the only way to achieve high write throughput with the Librdkafka library in conjunction with WarpStream is to avoid specifying any keys or specific partitions for each record as demonstrated above, and allowing the consistent_random partitioner and "sticky partitioning linger" functionality to generate produce requests containing very large batches for a single partition at a time. Also, keep in mind that if you enable idempotence with librdkafka you'll also need to reduce the value of batch.size above since the WarpStream Agents will reject individual batches of data that contain more than 16 MiB of uncompressed data when idempotence is enabled.


Java Client

Producer Settings

The Java client does not suffer from the same limitations that Librdkafka does and is able to create Produce requests that combine batches for many different partitions.

That said, there are still a few settings that should be tweaked to enable producing data at a high rate:

Producer SettingsRecommended Value

linger.ms

100

metadata.max.age.ms

60000

batch.size

100000 (Depends on # of partitions, see below)

buffer.memory

128000000

max.request.size

64000000

The main variable that has to be tuned based on your workload is the value of batch.size.

Unfortunately, the Java library allocates statically sized byte buffers for each partition that is actively being written to so the only way to tune this value appropriately without causing an OOM is to know how many partitions are being written to. That said, we recommend allocating at least 100MiB total across all the partitions you expect to write to.

The "number of partitions you expect to write to" is a function of the number of partitions in the topic you're writing to and your producer's partitioning strategy. For example, even a topic with 4096 partitions can be written to at high speeds and with low memory usage using the default partitioning strategy because the DefaultPartitioner will write all records to a one partition for a few ms, then move onto the next partitioner.

However, writing a topic with only 256 partition but using the RoundRobin partitioner may require significantly more memory since the Java client has to maintain an open buffer of size batch.size for every partition since they're all being written to concurrently.

For example, if your producer is is producing to 1024 partitions concurrently using the RoundRobin partitioner then 10000 is a good value. However, if it's only writing to 1 partition at a time (because you're using the DefaultPartitioner then you can select a much much larger value (as high as 8MiB even is fine).

In general though, if you're struggling to write data quickly to WarpStream with the Java client, batch.size is the first value you should experiment with. Try increasing it from the default of 100000 to 1000000, and then potentially up to 8000000 if necessary.

Consumer Settings

Consumer SettingsRecommended Value

metadata.max.age.ms

60000

fetch.max.bytes

100485760

max.partition.fetch.bytes

50242880

The Java client will only perform one concurrent fetch request per "broker", and WarpStream always presents itself to client applications as a single broker. As a result, the consumer client needs to be tuned to allow fetching a large amount of data in aggregate, as well as per partition.

We also recommend tuning the value of metadata.max.age.ms to make the clients react to changes in the number of Agents faster (which makes auto-scaling the Agents easier).


Franz-go

Producer Configuration

The Franz-go library is one of the most performant Kafka libraries and can achieve high write throughput for almost any workload with the following configuration.

Producer SettingsRecommended Value

MetadataMaxAge

10*time.Second

MaxBufferedRecords

1_000_000

ProducerBatchMaxBytes

16_000_000

ProducerWriteMaxBytes

64_000_000

RecordPartitioner

kgo.UniformBytesPartitioner(1_000_000, false, false, nil)

Consumer Configuration

Consumer SettingsRecommended Value

MetadataMaxAge

10*time.Second

FetchMaxBytes

100_000_000

FetchMaxPartitionBytes

50_000_000

FetchMaxWait

5 * time.Second

ConsumePreferringLagFn

kgo.PreferLagAt(50)

Balancers

kgo.RoundRobinBalancer()

The franz-go library will only perform one concurrent fetch request per "broker", and WarpStream always presents itself to client applications as a single broker. As a result, the consumer client needs to be tuned to allow fetching a large amount of data in aggregate, as well as per partition.

In FranzGo, metadata doesn't auto-refresh on errors, unlike other libraries. Users need to manually call ForceMetadataRefresh() or shorten the refresh interval to maybe 10 seconds. This approach boosts performance by quickly identifying and recovering from failures and aids in efficient load balancing with the default warpstream_partition_assignment_strategyConfiguring Kafka Client ID Features.


Segment kafka-go

If you're writing a new application in Go, we highly recommend using the franz-go library instead as it is significantly more feature rich, performant, and less buggy.

That said, we do support the Segment kafka-go library since many existing application already use it.

Similar to librdkafka, the Segment kafka-go library will never combine produce requests for multiple different partitions together into a single Produce request. However, even worse, the Segment library will also never issue concurrent Produce requests for a single partition either!

Those two combinations of things can be problematic for achieving high write throughput with a wide variety of workloads. However, in many cases reasonable throughput can be achieved with the following settings:

Producer SettingsRecommended Value

BatchTimeout

1 * time.Second

BatchSize

100_000 (or 10_000 if this uses too much memory)

BatchBytes

16_000_000

Note: Unlike other libraries segmentio does not require tuning the metadata refresh interval. Producers default to a 6-second interval, while consumers automatically reconnect using the bootstrap server upon detecting a connection failure. Caution: Unexpected behavior, such as abrupt halts in metadata refreshing, has been observed in Segmentio clients.

Idempotence

This library does not support idempotency.

Last updated

Logo

Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation. Kinesis is a trademark of Amazon Web Services.