We highly recommend running the WarpStream Agent with a dedicated bucket for isolation; however, the WarpStream Agent will only write/read data under the warpstream prefix.
You should use a (or the equivalent in your Cloud Service Provider) to ensure the network traffic between the WarpStream Agent and your Object Storage bucket does not incur any data transfer cost, such as the cost incurred by using a NAT Gateway.
The WarpStream Agent manages all data in the object storage warpstream directory. It is extremely important that you allow it to do so alone and never delete files from the warpstream directory manually. Manually deleting files in the warpstream directory will effectively "brick" a virtual cluster and require that it be recreated from scratch.
Bucket URL Construction
The bucketURL flag is the URL of the object storage bucket that the WarpStream Agent should write to. See the table below for how to configure it for different object store implementations.
Note that the WarpStream Agents will automatically write all of their data to a top-level warpstream prefix in the bucket. In addition, each cluster will write its data to a cluster-specific prefix (derived from the cluster ID) within the warpstream prefix so multiple WarpStream clusters can co-exist within the same object storage bucket without issue.
If you want to use an AssumeRole provider to authenticate, you can add the WARPSTREAM_BUCKET_ASSUME_ROLE_ARN_DEFAULT environment variable to your Agent. For example:
In general, we recommend using IAM roles whenever possible. However, if you want to provide object storage credentials manually then you'll need to set the following environment variables:
The WarpStream Agent embeds the official Azure Golang SDK which expects the AZURE_STORAGE_ACCOUNT environment variable to be set, along with one of the two following environment variables: AZURE_STORAGE_KEY or AZURE_STORAGE_SAS_TOKEN.
For testing and local development only. All data will be lost once the Agent shuts down.
Example: mem://my_memory_bucket
For testing and local development only. The file store implementation is not robust.
If you're using an "S3 compatible" object store that is not actually S3, like MinIO, R2 or Oracle Cloud Object Store then you'll need to provide credentials manually as environment variables and force the S3 client to construct the URL using the "path style":
If you have a MinIO docker container running locally on your machine on port 9000, you can run the Agent like this after creating an Access Key in the MinIO UI:
Note that if you run multiple WarpStream Agents this way in non-demo mode, then by default they need to be running on the same internal network. The reason for this is that if the Agents believe they're all running in the same "availability zone", they will attempt to form a distributed cache with each other to reduce R2 API GET requests.
However, if you wish to run multiple Agents in separate networks / regions, but still allow them to function as a single "Kafka Cluster", assign each one a dedicated availability zone.
This signals to each Agent that they should not attempt to communicate with each other directly over the local network, and that each one should behave as if it were running in a different availability zone. However, data will still be able to be streamed from Chicago to Nashville (or vice versa) because the Agents will use R2 as "the network".
The net result of this is a "multi-region" Cluster that can read and write all topics/partitions from multiple regions at the same time.
Using a Bucket Prefix
If you want the WarpStream Agents to store data in a specific prefix in the bucket, you can add the prefix as a query argument to the bucket URL. The prefix must terminate with a "/". For example:
The WarpStream bucket should not have a configured object retention policy. WarpStream will manage the lifecycle of the objects, including deleting objects that have been compacted or have expired due to retention. If you must configure a retention policy on your bucket, make sure it is significantly longer than the longest retention of any topic/stream in any of your Virtual Clusters to avoid data loss.
We recommend configuring a lifecycle policy for cleaning up aborted multi-part uploads. This will prevent failed file uploads from the WarpStream Agent from accumulating in the bucket forever and increasing your storage costs. Below is a sample Terraform configuration for various different cloud providers:
resource "aws_s3_bucket" "warpstream_bucket" {
bucket = "my-warpstream-bucket-123"
tags = {
Name = "my-warpstream-bucket-123"
Environment = "staging"
}
}
resource "aws_s3_bucket_metric" "warpstream_bucket_metrics" {
bucket = aws_s3_bucket.warpstream_bucket.id
name = "EntireBucket"
}
resource "aws_s3_bucket_lifecycle_configuration" "warpstream_bucket_lifecycle" {
bucket = aws_s3_bucket.warpstream_bucket.id
# Automatically cancel all multi-part uploads after 7d so we don't accumulate an infinite
# number of partial uploads.
rule {
id = "7d multi-part"
status = "Enabled"
abort_incomplete_multipart_upload {
days_after_initiation = 7
}
}
# No other lifecycle policy. The WarpStream Agent will automatically clean up and
# deleted expired files.
}
In addition to configuring the WarpStream buckets, you'll also need to make sure the Agent containers have the appropriate permissions to interact with the bucket.
Specifically, the Agents need permission to perform the following operations:
PutObject
To create new files.
GetObject
To read existing files.
DeleteObject
So the Agents can enforce retention and cleanup of pre-compaction files.
ListBucket
So the Agents can enforce retention and cleanup of pre-compaction files.
Below is an example Terraform configuration for an AWS IAM policy document that provides WarpStream with the appropriate permissions to access a dedicated S3 bucket:
Then wait until all the files in the foo bucket under the warpstream prefix had been deleted. Once all the files had been deleted, you would then deploy the Agents one final time with this configuration:
WARPSTREAM_BUCKET_URL=s3://bar?region=us-east-1
Kubernetes Workload Identity for Bucket Access
When running in Kubernetes in AWS, Azure, or GCP it is recommended to use Workload Identity to delegate access from the WarpStream Agent pods to the Object Storage bucket. This simplifies management of the object storage credentials and minimizes the risk of credential leaks.
The WarpStream Agent embeds the official AWS Golang SDK V2 so authentication/authorization with the specified S3 bucket can be handled in
Environment variables can be set in our K8s chart using the extraEnvs and extraEnvsFrom fields in the .
The WarpStream Agent embeds the official GCP Golang SDK so authentication/authorization with the storage bucket can be handled .
The MinIO team has a on their website as well. Note that the region query argument is a no-op, but required to pass validation in the S3 SDK.
Create an account with .
Make sure that the Agents to perform operations on both the old bucket and the new bucket.
Deploy the Agents with set to the new bucket instead of the old one. This will cause the Agents to write all new files (both for ingestion and compaction) to the new bucket while still allowing them to read historical data from the old bucket. You'll also need to set the -additionalDeadscannerBucketURLs flag or WARPSTREAM_ADDITIONAL_DEADSCANNER_BUCKET_URLS environment variable in the Agents to point to the old bucket so that the Agents continue to scan the old bucket for dead files and delete them.