Low Latency Clusters
Configure the WarpStream Agent with S3 Express, DynamoDB or Spanner to reduce Produce latency.
By default, WarpStream is tuned for maximum throughput and minimal costs at the expense of higher latency. However, WarpStream clusters can be tuned to provide much lower Produce latency in exchange for higher costs.
Check Client Configuration
Before tuning WarpStream itself, first check your client configuration. The WarpStream documentation has recommendations on how to tune various Kafka clients for maximum performance with WarpStream. You should still follow all those recommendations, however, if you want to minimize cluster latency then you should consider reducing the value of linger in your Kafka client from our default recommendation of 100ms to 25ms or 10ms.
Batch Timeout
The WarpStream Agents accept a -batchTimeout (WARPSTREAM_BATCH_TIMEOUT environment variable) that controls how long the Agents will buffer data in-memory before flushing it to object storage. Produce requests are never acknowledged back to the client before data is durably persisted in object storage, so this option has no impact on durability or correctness, but it does directly impact the latency of Produce requests.
The default batchTimeout in the Agents is 250ms , but the value can be decreased as low as 50ms to reduce Produce latency. Lowering this value will result in higher cloud infrastructure costs because the Agents will have to create more files in object storage and will incur higher PUT request API fees as a result.
S3 Express
S3 Express One Zone is a tier of AWS S3 that provides much lower latency for writes and reads, but only stores the data in a single availability zone. The WarpStream Agents have native support for S3 Express and can use it to store newly written data. Combined with a reduced batch timeout, S3 express can reduce the P99 latency of Produce requests to less than 150ms.
Tradeoffs and How We Mitigate Them
This latency improvement comes with tradeoffs that WarpStream helps you mitigate so that you get the best of both worlds. S3 Express offers faster reads and writes, but charges more for storage. It also provides less resilience than S3 "classic", since by default it doesn't duplicate data across multiple zones.
To mitigate S3 Express's increased storage costs, the WarpStream Agents can use different buckets for data ingestion and data compaction. This enables the Agents to ingest data into S3 Express to reduce Produce request latency, but then compact the data into a regular object storage bucket to keep storage costs low. Think of this as a form of tiered storage within the object store itself.
This is the recommended way to leverage S3 Express with WarpStream because the storage cost of retaining data in in S3 Express is ~7x higher than regular object storage before taking replication into account.
As the name implies, S3 Express One Zone only stores data in a single availability zone. Therefore to prevent availability zone failures from interrupting your cluster, WarpStream will automatically replicate your ingested data across a quorum of multiple S3 Express single-zone buckets.
WarpStream's multi-bucket replication makes S3 Express as resilient as S3 "classic", but drives up your storage costs even further. By restricting S3 Express to data ingestion only, you limit the cost increase to API calls and throughput while saving on storage. For more details on S3 Express One Zone pricing, see AWS's documentation.
Configuration
The first step to using S3 Express is to create the buckets. This can be done in the AWS console, or by using infrastructure as code like Terraform. Below is a sample Terraform block:
locals {
  # S3 Express may not be available in every zone in a region. This
  # is fine though because we don't get billed for inter-zone networking
  # between EC2 and S3 Express buckets. You can see the list of available
  # zone IDs here: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-Endpoints.html
  s3_express_zones_ids = ["use1-az4", "use1-az5", "use1-az6"]
}
resource "aws_s3_directory_bucket" "warpstream_s3_express_buckets" {
  count = length(local.s3_express_zones_ids)
  # AZ has to be encoded in this exact format, see docs:
  # https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_directory_bucket
  bucket          = "warpstream_s3_express--${local.s3_express_zones_ids[count.index]}--x-s3"
  data_redundancy = "SingleAvailabilityZone"
  type            = "Directory"
  location {
    name = local.s3_express_zones_ids[count.index]
    type = "AvailabilityZone"
  }
}Note that in addition to creating the S3 express buckets, you'll also want to add an S3 express endpoint your VPC. This is free, and will ensure that you don't pay internet egress fees for your S3OZ traffic, as well as reduce latency.
resource "aws_vpc_endpoint" "s3_express" {
  vpc_id       = $VPC_ID
  service_name = "com.amazonaws.$REGION.s3express"
  route_table_ids = $ROUTE_TABLE_IDS
}Note that we created three S3 Express directory buckets. The reason for this is that the WarpStream Agents will flush ingestion files to all S3 Express directory buckets, and then wait for a quorum of acknowledgements before considering the data durable. In the future we will allow more flexible configurations, but for now we require that at least 3 buckets are configured and all writes must succeed to at least 2 buckets before being considered successful.
In addition to creating the buckets, you'll also need to grant your WarpStream Agents' IAM role one extra permission: s3express:CreateSession.
Once you've created the buckets, and updated the WarpStream Agent IAM role, the final step is to change the Agent configuration to write newly ingested data to a quorum of the S3 Express directory buckets instead of the regular object storage bucket. This is done by deleting the -bucketURL flag (WARPSTREAM_BUCKET_URL environment variable) and replacing it with two new flags:
- -ingestionBucketURL(- WARPSTREAM_INGESTION_BUCKET_URL)
- -compactionBucketURL(- WARPSTREAM_COMPACTION_BUCKET_URL)
The value of compactionBucketURL should point to a classic S3 bucket configured for Warpstream, i.e. the same value as bucketURL in the default object store configuration.
The value of ingestionBucketURL should be a <> delimited list of S3 Express bucket directory URLs with a warpstream_multi:// prefix. For example:
warpstream_multi://s3://warpstream_s3_express--us-east-1a--x-s3?region=us-east-1<>s3://warpstream_s3_express--us-east-1d--x-s3?region=us-east-1<>s3://warpstream_s3_express--us-east-1f--x-s3?region=us-east-1That's it! The WarpStream Agents will automatically write newly ingested data to a quorum of the S3 Express directory buckets, and then asynchronously compact those files into the regular object storage bucket. The Agents will also automatically take care of deleting files whose data has completely expired from both the S3 Express directory buckets, and the regular object storage bucket.
Control Plane Latency
Similar to the Agents, the WarpStream control plane batches some virtual cluster operations, resulting in higher latency in exchange for reduced control plane costs. The low latency control plane can be enabled by using the Fundamentals or Pro Cluster Tiers.
AWS DynamoDB
In addition to S3 Express One Zone, AWS developers have the option to deploy their WarpStream agents using DynamoDB as the storage layer. Using DynamoDB yields latencies similar to S3 Express One Zone and generally costs less if the workload's throughput is low enough. Higher volume workloads should always prefer S3 Express One Zone over DynamoDB for cost reasons. See the Cost Estimates section below for more details.
Flags
Pointing the agent to DynamoDB as the backing store is as simple as passing a bucket URL with the following schema.
dynamodb://$aws_region/$files_table<>$chunks_table$aws_region is the agent's current region. If $files_table and $chunks_table are existing DynamoDB tables accessible from the same region, the agent will use those for storage. If the tables don't exist, the agent will create them. The need for two separate tables is an implementation detail that shouldn't otherwise affect developers.
As with S3 Express, we recommend replacing the -bucketURL flag with separate -ingestionBucketURL and -compactionBucketURL flags. The former should point to DynamoDB and the latter to S3. See the last two paragraphs of S3 Express above for details.
We also recommend setting the -batchTimeout flag to as low as 50 ms. When S3 is the backing store, lowering this value increases costs. Larger batching is advantageous with S3 because API usage is billed per request, regardless of payload sizes. DynamoDB charges per byte written and read, regardless of the number of API calls. Therefore a lower batch timeout reduces produce latency without affecting cost.
Finally, WarpStream's own control plane batching can be tuned for lower latency. See Control Plane Latency above.
AWS IAM Permissions
The process running the agent requires the following IAM permissions to use DynamoDB as the backing store.
"dynamodb:BatchWrite*",
"dynamodb:CreateTable",
"dynamodb:DeleteItem",
"dynamodb:Update*",
"dynamodb:PutItem",
"dynamodb:TagResource",
"dynamodb:BatchGet*",
"dynamodb:DescribeStream",
"dynamodb:DescribeTable",
"dynamodb:Get*",
"dynamodb:Query",
"dynamodb:Scan"Google Spanner (beta)
Google Spanner support for the data plane is available only for agents using version v709 and above
On GCP deployments, developers can choose to use Spanner as the storage layer. This is the only low-latency ingestion alternative in GCP, and offers similar tradeoffs to the DynamoDB option described above. It's also only recommended for low-throughput clusters for cost reasons. See the Cost Estimates section below for more details.
Flags
Pointing the agent to Spanner as the backing store is as simple as passing a bucket URL with the following schema.
spanner://projects/$PROJECT/instances/$INSTANCE/databases/$DATABASEThis is the most common way to address individual Spanner databases in GCP, just replace $PROJECT, $INSTANCE and $DATABASE with your GCP Project Name, Spanner Instance ID and database name. The database is expected to be provisioned by the user already. We recommend not sharing this database with other applications to prevent accidental deletions or other incidents. On startup, WarpStream agents will create the necessary tables for the data plane inside this database if they are not present yet. These are two simple tables: warpstream_files and warpstream_chunks. Tampering with these tables through means other than the WarpStream agent itself will result in undefined behavior and most probably a broken cluster.
As with S3 Express and DynamoDB, we recommend replacing the -bucketURL flag with separate -ingestionBucketURL and -compactionBucketURL flags. The former should point to Spanner and the latter to GCS. See the last two paragraphs of S3 Express above for details.
We also recommend setting the -batchTimeout flag to as low as 50 ms. When S3 is the backing store, lowering this value increases costs. Larger batching is advantageous with S3 because API usage is billed per request, regardless of payload sizes. Spanner charges for compute and storage, regardless of the number of API calls. Therefore a lower batch timeout reduces produce latency without affecting cost.
Finally, WarpStream's own control plane batching can be tuned for lower latency. See Control Plane Latency above.
Google IAM Permissions
We recommend running the agents with the IAM role roles/spanner.databaseUser assigned to them for the relevant database. This role gives agents all the permissions they need to run the data plane.
Cost Estimates
The table below presents the rough cost of each AWS service that can be used as the agent's storage layer for a hypothetical workload of a hundred kilobytes, one megabyte, and ten megabytes per second. These estimates are based on various assumptions, for example that one agent is deployed in each of three availability zone and that the compression ratio is 1:4. In the case of DynamoDB with provisioned usage, the budget is over-provisioned by a factor of 2 for headroom. Most importantly, these numbers only reflect the storage cost of keeping the last five seconds of data at any time. Since we recommend storing compacted data in S3 regardless where it's first ingested, the table below excludes any storage costs incurred after compaction. See the last two paragraphs of S3 Express above for details.
S3
$ 159
$ 159
$ 159
S3 Express
$ 235
$ 235
$ 235
DynamoDB on-demand
$ 81
$810
$ 8100
DynamoDB provisioned
$ 7.5
$ 75
$ 750
While these numbers are only estimates, they illustrate the advantage of using DynamoDB as the agent's storage layer for workloads with sufficiently low throughput.
Last updated
Was this helpful?