Tuning for Performance

Instructions on how to tune various Kafka clients for performance with WarpStream.

If the WarpStream Agents aren't running at close to 80-90% CPU utilization, then you can almost certainly derive more throughput from them by tuning your clients / workload.

Write / Produce Throughput

WarpStream Agents have no local disks and write data directly to object storage. Creating files in object storage is a relatively high latency operation compared to writing to a local disk. For example, in our experience, the P99 latency for writing a 4 MiB file to S3 is ~ 400ms.

As a result of this property, WarpStream Agents will only reach their maximum write throughput with highly concurrent workloads or workloads that write large batches. This is a practical consideration that derives mathematically from the ratio of system throughput to system latency (known in queuing theory as Little’s Law).

For example, an individual WarpStream cluster can easily sustain > 1GiB/s of write throughput, but to accomplish that with write latencies of 400ms, there must be a high degree of concurrency. For example, a write rate of 1GiB/s can be achieved with 1MiB batch sizes if there are 400 concurrent produce requests at any given moment.

Not having enough outstanding requests is the single biggest reason for low write throughput when using WarpStream.

Read / Fetch Throughput

When Kafka clients issue Fetch requests, they specify a limit on the amount of data to be returned (in aggregate, and per-partition) by the Broker. In Apache Kafka, the brokers interpret the limit in terms of compressed bytes, but in WarpStream the Agents interpret the limit in terms of uncompressed bytes. For more details on why this decision was made, check out our more detailed documentation about compression in WarpStream. That said, the practical implication of this decision is that for some workloads, you will need to tune your Kafka consumer clients to request more data per individual Fetch request to achieve the same throughput with WarpStream.

Client settings

We currently have documentation and recommended settings on how to achieve high write/read throughput with the following clients:

The documentation on this page focuses on achieving as much write throughput as possible using a single "instance" of each Kafka client.

However, if you're still struggling to drive the amount of write throughput you want from your application and the WarpStream Agents CPU utilization isn't very high, then you can always create more "instances" of the Kafka client in your application as well.


Librdkafka

Consumer SettingsRecommended Value

topic.metadata.refresh.interval.ms

60000

fetch.max.bytes

104857600

max.partition.fetch.bytes

52428800

Producer SettingsRecommended Value

topic.metadata.refresh.interval.ms

60000

queue.buffering.max.kbytes

1048576

queue.buffering.max.messages

1000000

message.max.bytes

64000000

batch.size

16000000

batch.num.messages

100000

linger.ms

100

sticky.partitioning.linger.ms

25

enable.idempotence

false

max.in.flight.requests.per.connection

1000000

partitioner

consistent_random

For both consumers and producers, the metadata refresh interval in librdkafka is configured to a more frequent 1 minute, compared to its default setting of 5 minutes. This adjustment enhances the client's responsiveness to changes in the cluster, ensuring efficient load balancing, especially when utilizing the default warpstream_partition_assignment_strategy in client ID features Configuring Kafka Client ID Features.

The producer settings above should result in good write throughput performance, regardless of your key distribution or partitioning scheme. However, performance will generally be improved when using NULL keys and not specifying which partition individual records should be written to. This enables the consistent_random partitioner to use the "sticky partitioning linger" functionality to produce optimally sized batches which improves compression ratios over the wire and reduces overhead both in the Kafka client and the agent.

msg := &kafka.Message{
	TopicPartition: kafka.TopicPartition{Topic: &topicName},
	Key:            nil,
	Value:          record.Value,
}
err := producer.Produce(msg, events)
if err != nil {
	return nil, fmt.Errorf("error producing record: %w", err)
}

However, as long as the idempotent producer functionality is disabled, this is not strictly required for achieving good throughput.

Another thing to keep in mind when using Librdkafka is that it is one of the few Kafka libraries that never combines batches for multiple partitions owned by the same broker into a single Produce request: https://github.com/confluentinc/librdkafka/issues/1700

That's why we recommend a high value for max.in.flight.requests.per.connection, especially when writing to a high number of partitions.

A note on idempotence

Enabling the idempotent producer functionality in the librdkafka client library in conjunction with WarpStream can result in extremely poor producer throughput and very high latency. This is the result of three conspiring factors:

  1. WarpStream has higher produce latency than traditional Apache Kafka

  2. WarpStream's service discovery mechanism is implemented such that each client believes a single Agent is the leader for all topic-partitions at any given moment

  3. Librdkafka only allows 5 concurrent produce requests per connection when the idempotent producer functionality is enabled instead of per partition

  4. Librdkafka never combines batches from multiple partitions owned by the same broker into a single Produce request: https://github.com/confluentinc/librdkafka/issues/1700

As a result of all this, the only way to achieve high write throughput with the Librdkafka library in conjunction with WarpStream is to avoid specifying any keys or specific partitions for each record as demonstrated above and allow the consistent_random partitioner and "sticky partitioning linger" functionality to generate produce requests containing very large batches for a single partition at a time. Also, keep in mind that if you enable idempotence with librdkafka you'll also need to reduce the value of batch.size above since the WarpStream Agents will reject individual batches of data that contain more than 16 MiB of uncompressed data when idempotence is enabled.


Java Client

Producer Settings

The Java client does not suffer from the same limitations that Librdkafka does and is able to create Produce requests that combine batches for many different partitions.

That said, there are still a few settings that should be tweaked to enable producing data at a high rate:

Producer SettingsRecommended Value

linger.ms

100

metadata.max.age.ms

20000

batch.size

100000 (Depends on # of partitions, see below)

buffer.memory

128000000

max.request.size

64000000

compression.type

lz4

The main variable that has to be tuned based on your workload is the value of batch.size.

Unfortunately, the Java library allocates statically sized byte buffers for each partition actively being written to, so the only way to tune this value appropriately without causing an OOM is to know how many partitions are being written to. That said, we recommend allocating at least 100MiB total across all the partitions you expect to write to.

The "number of partitions you expect to write to" is a function of the number of partitions in the topic you're writing to and your producer's partitioning strategy. For example, even a topic with 4096 partitions can be written to at high speeds and with low memory usage using the default partitioning strategy because the DefaultPartitioner will write all records to one partition for a few ms, then move on to the next partitioner.

However, writing a topic with only 256 partitions but using the RoundRobin partitioner may require significantly more memory since the Java client has to maintain an open buffer of size batch.size for every partition since they're all being written concurrently.

For example, if your producer is producing to 1024 partitions concurrently using the RoundRobin partitioner then 10000 is a good value. However, if it's only writing to 1 partition at a time (because you're using the DefaultPartitioner then you can select a much, much larger value (as high as 8MiB even is fine).

In general, though, if you're struggling to write data quickly to WarpStream with the Java client, batch.size is the first value you should experiment with. Try increasing it from the default of 100000 to 1000000, and then potentially up to 8000000 if necessary.

Consumer Settings

Consumer SettingsRecommended Value

metadata.max.age.ms

20000

fetch.max.bytes

100485760

max.partition.fetch.bytes

50242880

The Java client will only perform one concurrent fetch request per "broker," and WarpStream always presents itself to client applications as a single broker. As a result, the consumer client needs to be tuned to allow fetching a large amount of data in aggregate and per partition.

We also recommend tuning the value of metadata.max.age.ms to make the clients react to changes in the number of Agents faster (which makes auto-scaling the Agents easier).


Franz-go

Producer Configuration

The Franz-go library is one of the most performant Kafka libraries. With the following configuration, it can achieve high write throughput for almost any workload.

Producer SettingsRecommended Value

MetadataMaxAge

10*time.Second

MaxBufferedRecords

1_000_000

ProducerBatchMaxBytes

16_000_000

RecordPartitioner

kgo.UniformBytesPartitioner(1_000_000, false, false, nil)

Consumer Configuration

Consumer SettingsRecommended Value

MetadataMaxAge

10*time.Second

FetchMaxBytes

100_000_000

FetchMaxPartitionBytes

50_000_000

FetchMaxWait

5 * time.Second

ConsumePreferringLagFn

kgo.PreferLagAt(50)

Balancers

kgo.RoundRobinBalancer()

The franz-go library will only perform one concurrent fetch request per "broker," and WarpStream always presents itself to client applications as a single broker. As a result, the consumer client needs to be tuned to allow fetching a large amount of data in aggregate and per partition.

In FranzGo, metadata doesn't auto-refresh on errors, unlike other libraries. Users need to manually call ForceMetadataRefresh() or shorten the refresh interval to maybe 10 seconds. This approach boosts performance by quickly identifying and recovering from failures and aids in efficient load balancing with the default warpstream_partition_assignment_strategyConfiguring Kafka Client ID Features.


Segment kafka-go

If you're writing a new application in Go, we highly recommend using the franz-go library instead, as it is significantly more feature-rich, performant, and less buggy.

That said, we do support the Segment kafka-go library since many existing applications already use it.

Similar to librdkafka, the Segment kafka-go library will never combine produce requests for multiple different partitions into a single request. Even worse, the Segment library will never issue concurrent Produce requests for a single partition, either!

Those two combinations of things can be problematic for achieving high write throughput with a wide variety of workloads. However, in many cases, reasonable throughput can be achieved with the following settings:

Producer SettingsRecommended Value

BatchTimeout

1 * time.Second

BatchSize

100_000 (or 10_000 if this uses too much memory)

BatchBytes

16_000_000

Note: Unlike other libraries segmentio does not require tuning the metadata refresh interval. Producers default to a 6-second interval, while consumers automatically reconnect using the bootstrap server upon detecting a connection failure. Caution: Unexpected behavior, such as abrupt halts in metadata refreshing, has been observed in Segmentio clients.

Idempotence

This library does not support idempotency.

Sarama

If you're writing a new application in Go, we highly recommend using the franz-go library.

That said, we do support the Sarama library since many existing applications already use it.

Producer

config := sarama.NewConfig()
config.Net.ReadTimeout = 60 * time.Second
config.Metadata.RefreshFrequency = 20 * time.Second
config.Producer.MaxMessageBytes = 16_000_000
config.Producer.Flush.Bytes = 16_000_000
config.Producer.Flush.MaxMessages = 1_000_000
config.Producer.Flush.Frequency = 25 * time.Millisecond
config.Producer.Compression = sarama.CompressionLZ4

A note on idempotence and data ordering

The latest version of the Sarama library has significant liveness and correctness issues. It fails to maintain strict ordering of produced records, even when Net.MaxOpenRequest is set to 1, and it does not implement the idempotent producer protocol correctly which can lead to messages failing to be delivered and ultimately, data loss. More details about these issues are described in this P.R.

If you have an existing application that uses Sarama and doesn't enable idempotency, it will work fine with WarpStream. However, if you care about idempotency and/or strict data ordering of produced records, we highly recommend using the franz-go library instead.

Consumer

config := sarama.NewConfig()
config.Net.ReadTimeout = 60 * time.Second
config.Metadata.RefreshFrequency = 20 * time.Second
config.Consumer.Group.Session.Timeout = 60 * time.Second
config.Consumer.Group.Heartbeat.Interval = 5 * time.Second
config.Consumer.MaxProcessingTime = 20 * time.Second
config.Consumer.Fetch.Default = 100_000_000
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

KafkaJS

client = new Kafka({
    clientId: '$CLIENT_ID',
    brokers: ['$BOOTSTRAP_BROKER'],
    connectionTimeout: 10000,
});

Producer

Unlike most Kafka clients, KafkaJS expects the user of the application to handle batching on their own. To achieve high write throughput, issue produce requests with large batches (many records in each call to send() and high concurrency.

const producer = kafka.producer({
    metadataMaxAge: 20000,
    maxInflightRequests: null,
    compression: CompressionTypes.LZ4
});

Consumer

const consumer = kafka.consumer({
    sessionTimeout: 60000,
    rebalanceTimeout: 60000,
    heartbeatInterval: 3000,
    metadataMaxAge: 20000,
    maxBytesPerPartition: 50000000
    maxBytes: 100000000,
    maxInflightRequests: null
});

Last updated