Compacted topics
Information about compacted topics in WarpStream.
As described in our blogpost, the way WarpStream implements compacted topics is very different from how Apache Kafka does it, although at its core, the algorithm is very similar.
One key difference is that WarpStream compacts the data for many different compacted topics in a virtual cluster at the same time. This is due to the fact that WarpStream's storage engine stores data for many different topic-partitions in the same files to minimize object storage costs.
The rest of this document will focus on explaining the heuristic used by WarpStream on when to schedule a compact for compacted topics to provide readers with intuition on how compacted topics will perform in WarpStream.
Note that in this document we will use two words that are very similar but mean two very different things:
We will use compaction when we want to talk about the system, specific to WarpStream, that takes small files and makes bigger files to replace them. The compaction system is a critical piece of how a WarpStream Virtual Cluster operates.
We will use compacted topics when we want to talk about a topic where the user has configured
cleanup.policy = compactorcleanup.policy = compact,delete. Much like in Apache Kafka, this is a topic where WarpStream proactively removes records that share the same key as more recent records to reclaim space and reduce the amount of work performed by consumers.
L0 - L1 compaction
When processing Produce requests from Kafka producers the agents create ingestion files that we'll henceforth refer to as “L0 files”.
When the compaction system detects that there are a large number (currently 32) of L0 files it compacts them into “L1 files” that are bigger.
During this first compaction, we do not deduplicate keys, we just copy the data from L0 files to L1 files, without applying any special logic for compacted topics.
Higher level compactions
Any compactions beyond L0->L1 will deduplicate records. The highest compaction level a cluster may get to is L4. The compaction scheduler kicks in whenever it detects a large number of files in that level.
In general, the more continuous throughput a cluster has, the more compactions will run. For instance, the compaction scheduler will schedule a L1 to l2 compaction when there are 32 L1 files. A single agent receiving low but continuous throughput writes 4 L0 files per second (one file every 250ms), which means that a virtual cluster with only this agent will take about 256 seconds (32 * 32 / 4) to create enough files to trigger a L1 to L2 compaction, which will trigger record deduplication.
If your virtual cluster has more traffic, or more agents, it will be faster to accumulate enough files to trigger record deduplication. However, it’s never instantaneous. For small clusters, file needs to be compacted twice before the record reaches the highest level, first from L0 to L1, then second from L1 to L2 for this to happen. For big clusters, a file needs to be compacted even more times before reaching L4.
Exact guarantees
There are two main limitations to WarpStream's compaction system:
If there are more than 3 million distinct keys in a topic-partition within a file, not all records will be de-duplicated. This is covered in more detail in the next section.
If you have more than 128GiB of uncompressed data in a single partition, there could be duplicate records. That is because WarpStream never compacts files that contain more than 128GiB of uncompressed records.
Furthermore, unless max.compaction.lag.ms is set, WarpStream doesn't guarantee exactly when compactions will run. The heuristic that WarpStream uses to choose when to compact files together after the first L0 -> L1 and L1 -> L2 compaction is designed to minimize write amplification as well as the number of files.
As data is added to the Virtual Cluster, and compacted into L2 (or L4 for big clusters), this will create a continuous sequence of files. WarpStream scans these files, and when it finds 10 or more files that have a similar size, it will compact them together. WarpStream does not choose to compact files together when their size is very different because rewriting a 30GB file together with a 2MB file is very expensive and results in very little gain.
In addition, roughly once a day, WarpStream will run a compaction even on files which have very different sizes, to minimize the number of duplicate keys in the data set.
Distinct key limitation
In compacted topics, when you have two records with the same key, the first one can be deleted to reclaim space, because the second record with the same key represents a newer version of the same “resource”.
WarpStream does not guarantee that all of the records which could be deleted are indeed deleted. The reason for this is explained in detail in the blogpost.
The way WarpStream deduplicates records in compacted topics is as follows:
For every topic-partition participating in a compaction job, WarpStream allocates a 128MiB buffer where it will store the hash of keys of the records it encounters. When the compaction engine encounters a new record, it hashes its key into a 32-byte number (using SHA512/256) and checks whether it has already seen this key or not. If the key has already been seen the old record will be deleted.
Each key takes 41 bytes in the buffer (the 32 byte hash + 2 4-byte integers + one byte) which means that there is room for 3,273, 603 keys (128Mib / 41) in the buffer. If there are enough distinct keys to fill the buffer (e.g. when, during a single compaction, WarpStream has read more than 3 million distinct keys for a single topic-partition), it will clear the buffer and start afresh.
This means that you can expect to have one record per key in each file if you have less than 3 million distinct keys in a topic-partition. If there are more than 3 million distinct keys, you can expect to have one record per key per file post-compaction for each set of 3 million distinct keys encountered within a single compaction job. This is similar to Apache Kafka in terms of configuring the buffer size for the log cleaning modules.
WarpStream uses a variation on the streaming k-way merge algorithm to perform compactions for compacted topics. This means that the entire 3 million keys buffer is available to every single topic-partition participating in a compaction, but the buffer is only allocated once and re-used between topic-partitions which keeps memory usage low.
max.compaction.lag.ms
Note that the lower the max.compaction.lag.ms, the higher the write amplification and therefore more costly to keep records compacted. In general, if a topic's max.compaction.lag.ms is set to N hours, all files that contain data for that topic will be rewritten at least once every N hours.
If you want stronger guarantees for when files are compacted, you could set max.compaction.lag.ms. The topic's max.compaction.lag.ms is the target maximum delay between when a record is produced and when that record gets deduplicated with previous records. We call a record "dirty" if it exceeds that maximum delay, or more precisely if the record's timestamp is older than now - max.compaction.lag.ms.
Once a record becomes "dirty", the compaction system will compact the dirty record within 30 minutes. Note that to compact a record, the compaction system would need to rewrite the entire file containing that record. Therefore, the lower the max.compaction.lag.ms, the higher the write amplification and the more costly it is to keep records compacted.
For example, if you set the max.compaction.lag.ms for a topic to 2 hours, then a record will be compacted at most 2.5 hours after its creation (it can be before too). The reason for the additional delay is because compacting files isn't instantaneous. Compacting larger files may take up to 10 minutes. In addition, WarpStream's compaction system will not compact as soon as it detects when a record is eligible for compaction. Instead, it buffers for a short period of time (roughly 15 ~ 30 minutes) before actually compacting the files containing the dirty records. This reduces write amplification drastically and prevents the system from continuously recompacting files every time there is a new dirty record.
The smallest max.compaction.lag.ms supported in WarpStream is 2 hours. If you set the max lag to a value below 2 hours, the system will silently override it to 2 hours.
min.compaction.lag.ms
If min.compaction.lag.ms is set, WarpStream guarantees that a record will not be deduplicated with previous records (records with lower offsets) as long as the record's timestamp is more recent than now - min.compaction.lag.ms. This is useful for scenarios where you want to ensure that consumers are guaranteed to see all keys before they're deduplicated as long as they don't fall more than min.compaction.lag.ms behind.
Supported Configuration
WarpStream supports the following topic-level configurations that are relevant to Kafka compacted topics:
retention.msdelete.retention.msmin.compaction.lag.msmax.compaction.lag.mscleanup.policy
Additional Caveats
Unlike in Apache Kafka, a topic created with cleanup.policy = delete cannot be converted to a topic with cleanup.policy = compact and vice versa. The reason for this is that data for compacted and non-compacted topics are maintained in completely separate files in WarpStream.
Last updated
Was this helpful?