Object Storage Configuration

This page describes how to properly configure object storage for BYOC Agent deployments.

We highly recommend running the WarpStream Agent with a dedicated bucket for isolation; however, the WarpStream Agent will only write/read data under the warpstream prefix.

Bucket URL Construction

The bucketURL flag is the URL of the object storage bucket that the WarpStream Agent should write to. See the table below for how to configure it for different object store implementations.

Note that the WarpStream Agents will automatically write all of their data to a top-level warpstream prefix in the bucket. In addition, each cluster will write its data to a cluster-specific prefix (derived from the cluster ID) within the warpstream prefix so multiple WarpStream clusters can co-exist within the same object storage bucket without issue.

An S3 bucket with 16 different cluster prefixes under the top-level warpstream prefix.

Format: s3://$BUCKET_NAME?region=$BUCKET_REGION

Example: s3://my_warpstream_bucket_123?region=us-east-1

The WarpStream Agent embeds the official AWS Golang SDK V2 so authentication/authorization with the specified S3 bucket can be handled in any of the expected ways, like using a shared credentials file, environment variables, or simply running the Agents in an environment with an appropriate IAM role with Write/Read/Delete/List permissions on the S3 bucket.

Assume Role

If you want to use an AssumeRole provider to authenticate, you can add the WARPSTREAM_BUCKET_ASSUME_ROLE_ARN_DEFAULT environment variable to your Agent. For example:

WARPSTREAM_BUCKET_ASSUME_ROLE_ARN_DEFAULT=arn:aws:iam::103069001423:role/YourRoleName

Manually Providing Credentials

In general, we recommend using IAM roles whenever possible. However, if you want to provide object storage credentials manually then you'll need to set the following environment variables:

AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY

Environment variables can be set in our K8s chart using the extraEnvs and extraEnvsFrom fields in the charts values.yaml.

S3-compatible Object Stores (MinIO, R2, Oracle Cloud, Tigris, etc)

If you're using an "S3 compatible" object store that is not actually S3, like MinIO, R2 or Oracle Cloud Object Store then you'll need to provide credentials manually as environment variables and force the S3 client to construct the URL using the "path style":

If you have a MinIO docker container running locally on your machine on port 9000, you can run the Agent like this after creating an Access Key in the MinIO UI:

AWS_ACCESS_KEY_ID="wKghTMkQrFqszshHJcop" \
AWS_SECRET_ACCESS_KEY="MpMO9GFMaoIFFYd8cZi5gyk5SAjwleEbkZOSxIXv" \
warpstream demo \
-bucketURL "s3://<your-bucket>?region=us-east-1&s3ForcePathStyle=true&endpoint=http://127.0.0.1:9000""

The MinIO team has a more detailed integration guide on their website as well. Note that the region query argument is a no-op, but required to pass validation in the S3 SDK.

Using a Bucket Prefix

If you want the WarpStream Agents to store data in a specific prefix in the bucket, you can add the prefix as a query argument to the bucket URL. The prefix must terminate with a "/". For example:

s3://my_warpstream_bucket_123?region=us-east-1&prefix=my_prefix/

Bucket Configuration

The WarpStream bucket should not have a configured object retention policy. WarpStream will manage the lifecycle of the objects, including deleting objects that have been compacted or have expired due to retention. If you must configure a retention policy on your bucket, make sure it is significantly longer than the longest retention of any topic/stream in any of your Virtual Clusters to avoid data loss.

We recommend configuring a lifecycle policy for cleaning up aborted multi-part uploads. This will prevent failed file uploads from the WarpStream Agent from accumulating in the bucket forever and increasing your storage costs. Below is a sample Terraform configuration for various different cloud providers:

resource "aws_s3_bucket" "warpstream_bucket" {
  bucket = "my-warpstream-bucket-123"

  tags = {
    Name        = "my-warpstream-bucket-123"
    Environment = "staging"
  }
}

resource "aws_s3_bucket_metric" "warpstream_bucket_metrics" {
 bucket = aws_s3_bucket.warpstream_bucket.id
 name   = "EntireBucket"
}

resource "aws_s3_bucket_lifecycle_configuration" "warpstream_bucket_lifecycle" {
  bucket = aws_s3_bucket.warpstream_bucket.id

  # Automatically cancel all multi-part uploads after 7d so we don't accumulate an infinite
  # number of partial uploads.
  rule {
    id     = "7d multi-part"
    status = "Enabled"
    abort_incomplete_multipart_upload {
      days_after_initiation = 7
    }
  }
  
  # No other lifecycle policy. The WarpStream Agent will automatically clean up and
  # deleted expired files.
}

Bucket Permissions

In addition to configuring the WarpStream buckets, you'll also need to make sure the Agent containers have the appropriate permissions to interact with the bucket.

Specifically, the Agents need permission to perform the following operations:

  • PutObject

    • To create new files.

  • GetObject

    • To read existing files.

  • DeleteObject

    • So the Agents can enforce retention and cleanup of pre-compaction files.

  • ListBucket

    • So the Agents can enforce retention and cleanup of pre-compaction files.

Below is an example Terraform configuration for an AWS IAM policy document that provides WarpStream with the appropriate permissions to access a dedicated S3 bucket:

data "aws_iam_policy_document" "warpstream_s3_policy_document" {
  statement {
    sid     = "AllowS3"
    effect  = "Allow"
    actions = [
      "s3:PutObject",
      "s3:GetObject",
      "s3:DeleteObject",
      "s3:ListBucket"
    ]
    resources = [
      "arn:aws:s3:::my-warpstream-bucket-123",
      "arn:aws:s3:::my-warpstream-bucket-123/*"
    ]
  }
}

Migrating Between Object Storage Buckets

If you need to migrate a WarpStream cluster from one object storage bucket to another, follow these steps:

  1. Make sure that the Agents have permission to perform operations on both the old bucket and the new bucket.

  2. Deploy the Agents with the bucketURL flag set to the new bucket instead of the old one. This will cause the Agents to write all new files (both for ingestion and compaction) to the new bucket while still allowing them to read historical data from the old bucket. You'll also need to set the -additionalDeadscannerBucketURLs flag or WARPSTREAM_ADDITIONAL_DEADSCANNER_BUCKET_URLS environment variable in the Agents to point to the old bucket so that the Agents continue to scan the old bucket for dead files and delete them.

  3. Wait until there are no more data files under the warpstream prefix in the old bucket.

For example, if you were migrating from AWS S3 bucket foo to AWS S3 bucket bar then you would redeploy the Agents from this configuration:

WARPSTREAM_BUCKET_URL=s3://foo?region=us-east-1

To this configuration:

WARPSTREAM_ADDITIONAL_DEADSCANNER_BUCKET_URLS=s3://foo?region=us-east-1
WARPSTREAM_BUCKET_URL=s3://bar?region=us-east-1

Then wait until all the files in the foo bucket under the warpstream prefix had been deleted. Once all the files had been deleted, you would then deploy the Agents one final time with this configuration:

WARPSTREAM_BUCKET_URL=s3://bar?region=us-east-1

Kubernetes Workload Identity for Bucket Access

When running in Kubernetes in AWS, Azure, or GCP it is recommended to use Workload Identity to delegate access from the WarpStream Agent pods to the Object Storage bucket. This simplifies management of the object storage credentials and minimizes the risk of credential leaks.

Documentation: https://docs.aws.amazon.com/eks/latest/userguide/pod-identities.html Example Terraform

data "aws_iam_policy_document" "eks_service_account" {
  statement {
    effect = "Allow"

    principals {
      type        = "Federated"
      identifiers = [var.eks_oidc_provider_arn]
    }

    actions = ["sts:AssumeRoleWithWebIdentity"]

    condition {
      test     = "StringEquals"
      variable = "${replace(var.eks_oidc_issuer_url, "https://", "")}:sub"

      values = ["system:serviceaccount:${var.kubernetes_namespace}:warpstream-agent"]
    }

    condition {
      test     = "StringEquals"
      variable = "${replace(var.eks_oidc_issuer_url, "https://", "")}:aud"

      values = ["sts.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "eks_service_account" {
  name               = "warpstream-agent"
  assume_role_policy = data.aws_iam_policy_document.eks_service_account.json
}


data "aws_iam_policy_document" "eks_service_account_s3_bucket" {
  statement {
    effect = "Allow"

    actions = [
      "s3:ListBucket",
      "s3:GetObject",
      "s3:PutObject",
      "s3:DeleteObject",
    ]

    resources = [
      "arn:aws:s3:::${var.bucketName}",
    ]
  }
}

resource "aws_iam_role_policy" "eks_service_account_s3_bucket" {
  name = "warpstream-agent-s3"
  role = aws_iam_role.eks_service_account.id

  policy = data.aws_iam_policy_document.eks_service_account_s3_bucket.json
}

Example Configuration on our Helm Chart

config:
    bucketURL: s3://my-bucket-name
    
serviceAccount:
    annotations:
        "eks.amazonaws.com/role-arn": "arn:aws:iam::XXXXXXXXXXXX:role/warpstream-agent"

Last updated

Was this helpful?