Compacted topics
Information about compacted topics in WarpStream.
Last updated
Was this helpful?
Information about compacted topics in WarpStream.
Last updated
Was this helpful?
As described in , the way WarpStream implements compacted topics is very different from how Apache Kafka does it, although at its core, the algorithm is very similar.
One key difference is that WarpStream compacts the data for many different compacted topics in a virtual cluster at the same time. This is the due to the fact that stores data for many different topic-partitions in the same files to minimize object storage costs.
This means that the heuristic with which we choose to schedule a compaction is more prudent than that of Apache Kafka, which compacts a single partition. As a result, WarpStream provides less strict guarantees about how and when records with duplicate keys will be removed from compacted topics.
The rest of this document will focus on explaining the heuristic used by WarpStream on when to schedule a compact for compacted topics to provide readers with intuition on how compacted topics will perform in WarpStream.
When processing Produce requests from Kafka producers the agents create ingestion files that we'll henceforth refer to as “L0 files”.
When the compaction system detects that there are a large number (currently 32) of L0 files it compacts them into “L1 files” that are bigger.
During this first compaction, we do not deduplicate keys, we just copy the data from L0 files to L1 files, without applying any special logic for compacted topics.
The L1 to L2 compaction kicks in whenever the compaction system detects a large number (currently 32) L1 files. This is the first opportunity that WarpStream has to deduplicate records in compacted topics.
A single agent receiving low but continuous throughput writes 4 L0 files per second (one file every 250ms), which means that a virtual cluster with only this agent will take about 256 seconds (32 * 32 / 4) to create enough files to trigger a L1 to L2 compaction, which will trigger record deduplication.
If your virtual cluster has more traffic, or more agents, it will be faster to accumulate enough files to trigger record deduplication. However, it’s never instantaneous. Files need to be compacted twice, first from L0 to L1, then second from L1 to L2 for this to happen.
In compacted topics, when you have two records with the same key, the first one can be deleted to reclaim space, because the second record with the same key represents a newer version of the same “resource”.
The way WarpStream deduplicates records in compacted topics is as follows:
For every topic-partition participating in a compaction job, WarpStream allocates a 128MB buffer where it will store the hash of keys of the records it encounters. When the compaction engine encounters a new record, it hashes its key into a 32-byte number (using SHA512/256) and checks whether it has already seen this key or not. If the key has already been seen the old record will be deleted.
Each key takes 41 bytes in the buffer (the 32 byte hash + 2 4-byte integers + one byte) and if there are enough distinct keys to fill the buffer (e.g. when, during a single compaction, WarpStream has read more than 3 million distinct keys for a single topic-partition), it will clear the buffer and start afresh.
This means that you can expect to have one record per key in each file if you have less than 3 million distinct keys in a topic-partition. If there are more than 3 million distinct keys, you can expect to have one record per key per file post-compaction for each set of 3 million distinct keys encountered within a single compaction job. This is similar to Apache Kafka in terms of configuring the buffer size for the log cleaning modules.
The previous paragraph suggests that if there is a reasonable number of distinct keys, WarpStream will ensure that there is a single record per key in each file.
But the heuristic that WarpStream uses to choose when to compact files together after the first L0 -> L1 and L1 -> L2 compaction tries to minimize write amplification as well as the number of files.
There are three main heuristics that come into play when choosing when to trigger a L2 compaction.
WarpStream never compacts files that are bigger than 128GiB
As data is added to the Virtual Cluster, and compacted into L2, this will create a continuous sequence of files. WarpStream scans these files, and when it finds 10 or more files that have a similar size, it will compact them together. WarpStream does not choose to compact files together when their size is very different because rewriting a 30GB file together with a 2MB file is very expensive and results in very little gain.
In practice this means that WarpStream estimates that it’s worth incurring the write amplification when there are 10 records with the same key, in 10 different files that are similarly sized. It will then schedule a compaction, which will remove the 9 extraneous records for each key.
In addition to this, roughly once a day, WarpStream will run a compaction even on files which have very different sizes, to minimize the number of duplicate keys in the data set.
WarpStream supports the following topic-level configurations that are relevant to Kafka compacted topics:
retention.ms
delete.retention.ms
min.compaction.lag.ms
cleanup.policy
Unlike in Apache Kafka, a topic created with cleanup.policy = delete
cannot be converted to a topic with cleanup.policy = compact
and vice versa. The reason for this is that data for compacted and non-compacted topics are maintained in completely separate files in WarpStream.
WarpStream does not guarantee that all of the records which could be deleted are indeed deleted. The reason for this is explained in detail in .