Read Path - Fetch()

The following page describes the mechanism for serving Fetch requests from WarpStream.

For a detailed discussion of the WarpStream read path, review Minimizing S3 API Costs with Distributed mmap on the WarpStream blog.

Introduction

The goal of the WarpStream Agent fetch path is to maintain cost efficiency by ensuring that the Agents only load data from object storage once per availability zone, and minimize GET requests to object storage by loading data in large chunks. To accomplish this goal, WarpStream Agents maintain a per-Availability Zone cache that behaves like a distributed mmap, which completely decouples the number of partitions and consumers from the number of object storage GET requests that the Agents have to perform to serve consumer clients' Fetch() requests.

Read Path

In order to efficiently distribute load between Agents, WarpStream uses a consistent hashing ring between the Agents, which enables each Agent to maintain responsibility for the location of a subset of data in a given topic. When a WarpStream Agent receives a Fetch() request from a client, the first step is to find the Agent that is responsible for the file, and route the request to that Agent. Then, the responsible Agent pages the file chunk into memory. This in-memory cache can then be used to serve Fetch() requests to clients.

A cache is maintained per Availability Zone, which ensures that Fetch() requests do not need to cross zonal boundaries in order to be fulfilled. While this results in slightly more GET requests to object storage, the avoidance of cross-AZ data transfer fees more than makes up for this increase. In addition, the Agents serve subsequent Fetch() requests from the cache, which further reduces the impact of the per-AZ GET requests.

In the illustration above, Agent 1 initially receives a Fetch() request from a client for a particular topic, partition, and offset. Agent 1 forwards the request to Agent 2 because Agent 2 is responsible for the file that contains the data requested by the client.

Agent 2 pages the relevant chunk of the file into memory, which can now be returned to the client that made the Fetch() request. Subsequently, two more requests are received, first by Agent 3 and then by Agent 2. Because Agent 2 already has the chunk of the relevant file in memory, and both requests are fetching data located in File 3, Agent 2 serves the Fetch() requests from memory, eliminating the need to read it out of object storage again.

Refer to Minimizing S3 API Costs with Distributed mmap on our blog for a detailed discussion of WarpStream's cost and performance optimizations using object storage primitives.

This approach results in a workload cost profile that is both more cost-efficient than what is possible with Kafka brokers with local disks, and cost-efficient with respect to usage of cloud object storage.

Ordering

Any WarpStream Agent can serve any Fetch() request from any client. WarpStream does not have a concept of leader partitions. However, WarpStream still maintains Kafka's ordering guarantees.

In the above example, the client makes a Fetch() request for messages beginning with offset 306. The Agent queries the Metadata Store for the file(s) and batches in which these offsets are contained. In order to maintain Kafka's ordering semantics, the Metadata Store returns an ordered list of files and batches that the Agent should read. Because writes to object storage are decoupled from metadata commits, the files may be written to object storage out of order, but the source of truth for ordering is held in the Metadata Store, so clients are served data in the correct order.

Follower Fetching

Whereas Kafka requires careful considerations and complex configuration to reduce cross-AZ networking costs, WarpStream is designed from first principles to avoid cross-AZ traffic.

For a detailed discussion of WarpStream's zone-aware routing and AZ-specific load balancing, please review Hacking the Kafka PRoTocOL on the WarpStream blog.

By default, Kafka consumers read from the leader partition, regardless of where the broker hosting that partition is located. To help reduce cross-AZ traffic between consumers and brokers, Apache Kafka has introduced a feature to enable consumer clients to fetch from the closest replica, regardless of whether it is the leader partition. This feature enables consumers to avoid cross-AZ data transfer fees by aligning Fetch() requests to the broker's availability zone. However, Follower Fetching is a complex feature to use in real-world situations.

To use Follower Fetching, the consumer must be aligned with at least one zone where Kafka is deployed. To maintain high availability, the consumer must be distributed across multiple AZs. In many cases, this would require a significant amount of work, and in all cases it results in a more complex deployment.

For example, a consumer for a given application may be deployed in a single AZ, and if Follower Fetching is enabled, this consumer will read only from the broker in that AZ. If the follower partition falls out of the ISR, consumers will be unable to consume records whose offsets exist locally but are not yet known to be committed, or whose offsets are known to be committed but do not yet exist locally. Both of these situations will manifest as unavailability. This is a common case in organizations with a shared Kafka cluster that many application teams use to source data.

In the worst case, consumer applications may be deployed in an Availability Zone that has no overlap with the Kafka cluster, which results in 100% misalignment between the consumer and Kafka. Due to the cost of replicating data across AZs and the complexity of maintaining local replicas, Kafka clusters normally span a maximum of three AZs. Simply stretching the cluster to another AZ to ensure zone alignment with the consumer normally negates the cost savings achieved by zone alignment, and increases the complexity of managing the cluster.

These complications are due to Kafka's replication protocol, which replicates data between brokers' local disks. In addition, because brokers are stateful systems and changing where the brokers are deployed is nontrivial, once a decision is made regarding which AZs to deploy the brokers in, it is difficult to change.

Because WarpStream Agents are stateless, and WarpStream is designed specifically to avoid cross-AZ data transfer fees, achieving zone alignment between Agents and clients is trivial. In the case where the consumer has partial alignment with the cluster, WarpStream incurs zero cross-AZ data transfer charges because there is no concept of a "leader" partition. Consumers can fetch data from any partition, from any Agent, and reads occur within the same AZ due to the caching approach discussed above. In the case where there is zero alignment between the consumer and the cluster, users can trivially deploy Agents in the correct Availability Zone.

Last updated

Logo

Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation. Kinesis is a trademark of Amazon Web Services.