LogoLogo
WarpStream.comSlackDiscordContact UsCreate Account
  • Overview
    • Introduction
    • Architecture
      • Service Discovery
      • Write Path
      • Read Path
      • Life of a Request (Simplified)
    • Change Log
  • Getting Started
    • Install the WarpStream Agent / CLI
    • Run the Demo
    • "Hello World" for Apache Kafka
  • BYOC
    • Run the Agents Locally
    • Deploy the Agents
      • Object Storage Configuration
      • Kubernetes Known Issues
      • Rolling Restarts and Upgrades
    • Client Configuration
      • Tuning for Performance
      • Configure Clients to Eliminate AZ Networking Costs
        • Force Interzone Load Balancing
      • Configuring Kafka Client ID Features
      • Known Issues
    • Infrastructure as Code
      • Terraform Provider
      • Helm charts
      • Terraform Modules
      • Protecting important resources from accidental deletion
    • Monitoring
      • Pre-made Datadog Dashboard
      • Pre-made Grafana Dashboard
      • Important Metrics and Logs
      • Recommended List of Alerts
      • Monitoring Consumer Groups
      • Hosted Prometheus Endpoint
    • Authentication
      • SASL Authentication
      • Mutual TLS (mTLS)
      • Basic Authentication
    • Advanced Agent Deployment Options
      • Agent Roles
      • Agent Groups
      • Protect Data in Motion with TLS Encryption
      • Low Latency Clusters
      • Network Architecture Considerations
      • Agent Configuration Reference
      • Reducing Infrastructure Costs
      • Client Configuration Auto-tuning
    • Hosted Metadata Endpoint
    • Managed Data Pipelines
      • Cookbooks
    • Schema Registry
      • WarpStream BYOC Schema Registry
      • Schema Validation
      • WarpStream Schema Linking
    • Orbit
    • Port Forwarding (K8s)
  • Reference
    • ACLs
    • Billing
      • Direct billing
      • AWS Marketplace
    • Benchmarking
    • Compression
    • Protocol and Feature Support
      • Kafka vs WarpStream Configuration Reference
      • Compacted topics
      • Topic Configuration Reference
    • Secrets Overview
    • Security and Privacy Considerations
    • API Reference
      • API Keys
        • Create
        • Delete
        • List
      • Virtual Clusters
        • Create
        • Delete
        • Describe
        • List
        • DescribeConfiguration
        • UpdateConfiguration
      • Virtual Clusters Credentials
        • Create
        • Delete
        • List
      • Monitoring
        • Describe All Consumer Groups
      • Pipelines
        • List Pipelines
        • Create Pipeline
        • Delete Pipeline
        • Describe Pipeline
        • Create Pipeline Configuration
        • Change Pipeline State
      • Invoices
        • Get Pending Invoice
        • Get Past Invoice
    • CLI Reference
      • warpstream agent
      • warpstream demo
      • warpstream cli
      • warpstream cli-beta
        • benchmark-consumer
        • benchmark-producer
        • console-consumer
        • console-producer
        • consumer-group-lag
        • diagnose-record
        • file-reader
        • file-scrubber
      • warpstream playground
    • Integrations
      • Arroyo
      • AWS Lambda Triggers
      • ClickHouse
      • Debezium
      • Decodable
      • DeltaStream
      • docker-compose
      • DuckDB
      • ElastiFlow
      • Estuary
      • Fly.io
      • Imply
      • InfluxDB
      • Kestra
      • Materialize
      • MinIO
      • MirrorMaker
      • MotherDuck
      • Ockam
      • OpenTelemetry Collector
      • ParadeDB
      • Parquet
      • Quix Streams
      • Railway
      • Redpanda Console
      • RisingWave
      • Rockset
      • ShadowTraffic
      • SQLite
      • Streambased
      • Streamlit
      • Timeplus
      • Tinybird
      • Upsolver
    • Partitions Auto-Scaler (beta)
    • Serverless Clusters
    • Enable SAML Single Sign-on (SSO)
    • Trusted Domains
    • Diagnostics
      • GoMaxProcs
      • Small Files
Powered by GitBook
On this page
  • Bucket URL Construction
  • S3-compatible Object Stores (MinIO, R2, Oracle Cloud, Tigris, etc)
  • Using a Bucket Prefix
  • Bucket Configuration
  • Bucket Permissions
  • Migrating Between Object Storage Buckets
  • Kubernetes Workload Identity for Bucket Access

Was this helpful?

  1. BYOC
  2. Deploy the Agents

Object Storage Configuration

This page describes how to properly configure object storage for BYOC Agent deployments.

PreviousDeploy the AgentsNextKubernetes Known Issues

Last updated 9 days ago

Was this helpful?

We highly recommend running the WarpStream Agent with a dedicated bucket for isolation; however, the WarpStream Agent will only write/read data under the warpstream prefix.

You should use a (or the equivalent in your Cloud Service Provider) to ensure the network traffic between the WarpStream Agent and your Object Storage bucket does not incur any data transfer cost, such as the cost incurred by using a NAT Gateway.

The WarpStream Agent manages all data in the object storage warpstream directory. It is extremely important that you allow it to do so alone and never delete files from the warpstream directory manually. Manually deleting files in the warpstream directory will effectively "brick" a virtual cluster and require that it be recreated from scratch.

Bucket URL Construction

The bucketURL flag is the URL of the object storage bucket that the WarpStream Agent should write to. See the table below for how to configure it for different object store implementations.

Note that the WarpStream Agents will automatically write all of their data to a top-level warpstream prefix in the bucket. In addition, each cluster will write its data to a cluster-specific prefix (derived from the cluster ID) within the warpstream prefix so multiple WarpStream clusters can co-exist within the same object storage bucket without issue.

Format: s3://$BUCKET_NAME?region=$BUCKET_REGION

Example: s3://my_warpstream_bucket_123?region=us-east-1

Assume Role

If you want to use an AssumeRole provider to authenticate, you can add the WARPSTREAM_BUCKET_ASSUME_ROLE_ARN_DEFAULT environment variable to your Agent. For example:

WARPSTREAM_BUCKET_ASSUME_ROLE_ARN_DEFAULT=arn:aws:iam::103069001423:role/YourRoleName

Manually Providing Credentials

In general, we recommend using IAM roles whenever possible. However, if you want to provide object storage credentials manually then you'll need to set the following environment variables:

AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY

Format: gs://$BUCKET_NAME

Example: gs://my_warpstream_bucket_123

Format: azblob://$CONTAINER_NAME

Example: azblob://my_warpstream_container_123

The WarpStream Agent embeds the official Azure Golang SDK which expects the AZURE_STORAGE_ACCOUNT environment variable to be set, along with one of the two following environment variables: AZURE_STORAGE_KEY or AZURE_STORAGE_SAS_TOKEN.

For testing and local development only. All data will be lost once the Agent shuts down.

Example: mem://my_memory_bucket

For testing and local development only. The file store implementation is not robust.

Format: file://$PATH_TO_DIRECTORY

Example: file:///tmp/warpstream_tmp_123

S3-compatible Object Stores (MinIO, R2, Oracle Cloud, Tigris, etc)

If you're using an "S3 compatible" object store that is not actually S3, like MinIO, R2 or Oracle Cloud Object Store then you'll need to provide credentials manually as environment variables and force the S3 client to construct the URL using the "path style":

If you have a MinIO docker container running locally on your machine on port 9000, you can run the Agent like this after creating an Access Key in the MinIO UI:

AWS_ACCESS_KEY_ID="wKghTMkQrFqszshHJcop" \
AWS_SECRET_ACCESS_KEY="MpMO9GFMaoIFFYd8cZi5gyk5SAjwleEbkZOSxIXv" \
warpstream demo \
-bucketURL "s3://<your-bucket>?region=us-east-1&s3ForcePathStyle=true&endpoint=http://127.0.0.1:9000""
  1. Create an R2 bucket.

  2. Create an R2 access token.

AWS_ACCESS_KEY_ID="XXX" \
AWS_SECRET_ACCESS_KEY="XXX" \
warpstream demo -bucketURL "s3://warpstream-demo-for-fun?s3ForcePathStyle=true&region=auto&endpoint=https://XXX.r2.cloudflarestorage.com" 

Note that if you run multiple WarpStream Agents this way in non-demo mode, then by default they need to be running on the same internal network. The reason for this is that if the Agents believe they're all running in the same "availability zone", they will attempt to form a distributed cache with each other to reduce R2 API GET requests.

However, if you wish to run multiple Agents in separate networks / regions, but still allow them to function as a single "Kafka Cluster", assign each one a dedicated availability zone.

For example, Agent 1:

WARPSTREAM_AVAILABILITY_ZONE="personal_laptop_chicago" \
AWS_ACCESS_KEY_ID="XXX" \
AWS_SECRET_ACCESS_KEY="XXX" \
warpstream agent -bucketURL "s3://warpstream-demo-for-fun?s3ForcePathStyle=true&region=auto&endpoint=https://XXX.r2.cloudflarestorage.com"

Agent 2:

WARPSTREAM_AVAILABILITY_ZONE="personal_laptop_nashville" \
AWS_ACCESS_KEY_ID="XXX" \
AWS_SECRET_ACCESS_KEY="XXX" \
warpstream agent -bucketURL "s3://warpstream-demo-for-fun?s3ForcePathStyle=true&region=auto&endpoint=https://XXX.r2.cloudflarestorage.com"

This signals to each Agent that they should not attempt to communicate with each other directly over the local network, and that each one should behave as if it were running in a different availability zone. However, data will still be able to be streamed from Chicago to Nashville (or vice versa) because the Agents will use R2 as "the network".

The net result of this is a "multi-region" Cluster that can read and write all topics/partitions from multiple regions at the same time.

Using a Bucket Prefix

If you want the WarpStream Agents to store data in a specific prefix in the bucket, you can add the prefix as a query argument to the bucket URL. The prefix must terminate with a "/". For example:

s3://my_warpstream_bucket_123?region=us-east-1&prefix=my_prefix/

Bucket Configuration

The WarpStream bucket should not have a configured object retention policy. WarpStream will manage the lifecycle of the objects, including deleting objects that have been compacted or have expired due to retention. If you must configure a retention policy on your bucket, make sure it is significantly longer than the longest retention of any topic/stream in any of your Virtual Clusters to avoid data loss.

We recommend configuring a lifecycle policy for cleaning up aborted multi-part uploads. This will prevent failed file uploads from the WarpStream Agent from accumulating in the bucket forever and increasing your storage costs. Below is a sample Terraform configuration for various different cloud providers:

resource "aws_s3_bucket" "warpstream_bucket" {
  bucket = "my-warpstream-bucket-123"

  tags = {
    Name        = "my-warpstream-bucket-123"
    Environment = "staging"
  }
}

resource "aws_s3_bucket_metric" "warpstream_bucket_metrics" {
 bucket = aws_s3_bucket.warpstream_bucket.id
 name   = "EntireBucket"
}

resource "aws_s3_bucket_lifecycle_configuration" "warpstream_bucket_lifecycle" {
  bucket = aws_s3_bucket.warpstream_bucket.id

  # Automatically cancel all multi-part uploads after 7d so we don't accumulate an infinite
  # number of partial uploads.
  rule {
    id     = "7d multi-part"
    status = "Enabled"
    abort_incomplete_multipart_upload {
      days_after_initiation = 7
    }
  }
  
  # No other lifecycle policy. The WarpStream Agent will automatically clean up and
  # deleted expired files.
}
resource "google_storage_bucket" "warpstream_bucket" {
  name     = "my-warpstream-bucket-123"
  location = "$REGION"

  labels = {
    Name        = "my-warpstream-bucket-123"
    Environment = "staging"
  }
  
  lifecycle_rule {
    condition {
      age = 7
    }
    action {
      type = "AbortIncompleteMultipartUpload"
    }
  }
  
  soft_delete_policy {
    retention_duration_seconds = 0
  }
}

Bucket Permissions

In addition to configuring the WarpStream buckets, you'll also need to make sure the Agent containers have the appropriate permissions to interact with the bucket.

Specifically, the Agents need permission to perform the following operations:

  • PutObject

    • To create new files.

  • GetObject

    • To read existing files.

  • DeleteObject

    • So the Agents can enforce retention and cleanup of pre-compaction files.

  • ListBucket

    • So the Agents can enforce retention and cleanup of pre-compaction files.

Below is an example Terraform configuration for an AWS IAM policy document that provides WarpStream with the appropriate permissions to access a dedicated S3 bucket:

data "aws_iam_policy_document" "warpstream_s3_policy_document" {
  statement {
    sid     = "AllowS3"
    effect  = "Allow"
    actions = [
      "s3:PutObject",
      "s3:GetObject",
      "s3:DeleteObject",
      "s3:ListBucket"
    ]
    resources = [
      "arn:aws:s3:::my-warpstream-bucket-123",
      "arn:aws:s3:::my-warpstream-bucket-123/*"
    ]
  }
}

The easiest way to configure bucket access in GCP is with the roles/storage.objectUser role like so:

resource "google_storage_bucket_iam_policy" "warpstream_bucket_iam" {
  bucket     = "my-warpstream-bucket-123"

  policy_data = jsonencode({
    bindings = [
      {
        role    = "roles/storage.objectUser"
        members = [
          "user:$USER"
        ]
      }
    ]
  })
}

However, if you need more granular permission sets, then WarpStream requires at least the following:

  • storage.objects.create

  • storage.objects.delete

  • storage.objects.get

  • storage.objects.list

  • storage.multipartUploads.*

Migrating Between Object Storage Buckets

If you need to migrate a WarpStream cluster from one object storage bucket to another, follow these steps:

  1. Wait until there are no more data files under the warpstream prefix in the old bucket.

For example, if you were migrating from AWS S3 bucket foo to AWS S3 bucket bar then you would redeploy the Agents from this configuration:

WARPSTREAM_BUCKET_URL=s3://foo?region=us-east-1

To this configuration:

WARPSTREAM_ADDITIONAL_DEADSCANNER_BUCKET_URLS=s3://foo?region=us-east-1
WARPSTREAM_BUCKET_URL=s3://bar?region=us-east-1

Then wait until all the files in the foo bucket under the warpstream prefix had been deleted. Once all the files had been deleted, you would then deploy the Agents one final time with this configuration:

WARPSTREAM_BUCKET_URL=s3://bar?region=us-east-1

Kubernetes Workload Identity for Bucket Access

When running in Kubernetes in AWS, Azure, or GCP it is recommended to use Workload Identity to delegate access from the WarpStream Agent pods to the Object Storage bucket. This simplifies management of the object storage credentials and minimizes the risk of credential leaks.

data "aws_iam_policy_document" "eks_service_account" {
  statement {
    effect = "Allow"

    principals {
      type        = "Federated"
      identifiers = [var.eks_oidc_provider_arn]
    }

    actions = ["sts:AssumeRoleWithWebIdentity"]

    condition {
      test     = "StringEquals"
      variable = "${replace(var.eks_oidc_issuer_url, "https://", "")}:sub"

      values = ["system:serviceaccount:${var.kubernetes_namespace}:warpstream-agent"]
    }

    condition {
      test     = "StringEquals"
      variable = "${replace(var.eks_oidc_issuer_url, "https://", "")}:aud"

      values = ["sts.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "eks_service_account" {
  name               = "warpstream-agent"
  assume_role_policy = data.aws_iam_policy_document.eks_service_account.json
}


data "aws_iam_policy_document" "eks_service_account_s3_bucket" {
  statement {
    effect = "Allow"

    actions = [
      "s3:ListBucket",
      "s3:GetObject",
      "s3:PutObject",
      "s3:DeleteObject",
    ]

    resources = [
      "arn:aws:s3:::${var.bucketName}",
    ]
  }
}

resource "aws_iam_role_policy" "eks_service_account_s3_bucket" {
  name = "warpstream-agent-s3"
  role = aws_iam_role.eks_service_account.id

  policy = data.aws_iam_policy_document.eks_service_account_s3_bucket.json
}
config:
    bucketURL: s3://my-bucket-name
    
serviceAccount:
    annotations:
        "eks.amazonaws.com/role-arn": "arn:aws:iam::XXXXXXXXXXXX:role/warpstream-agent"
resource "azurerm_user_assigned_identity" "warpstream_agent" {
  name                = "warpstream-agent"
  resource_group_name = var.resource_group_name
  location            = var.location
}

resource "azurerm_federated_identity_credential" "identity_credential" {
  name                = "warpstream-agent"
  resource_group_name = var.resource_group_name
  audience            = ["api://AzureADTokenExchange"]
  issuer              = var.aks_oidc_issuer_url
  parent_id           = azurerm_user_assigned_identity.warpstream_agent.id
  subject             = "system:serviceaccount:${var.kubernetes_namespace}:warpstream-agent"
}

resource "azurerm_role_assignment" "reader_and_data_assigned_identity" {
  scope                = var.azure_container_resource_manager_id
  role_definition_name = "Storage Blob Data Contributor"
  principal_id         = azurerm_user_assigned_identity.warpstream_agent.principal_id
}
config:
    bucketURL: azblob://my-bucket-name
    
serviceAccount:
    annotations:
        "azure.workload.identity/client-id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
resource "google_service_account" "warpstream_agent" {
  account_id   = "warpstream-agent"
  display_name = "Service Account Kubernetes"
}

resource "google_storage_bucket_iam_member" "gcs_access" {
  bucket = var.bucket_name
  role   = "roles/storage.objectAdmin"
  member = "serviceAccount:${google_service_account.warpstream_agent.email}"
}

resource "google_project_iam_member" "gcs_access_token_creator" {
  project  = var.project
  role     = "roles/iam.serviceAccountTokenCreator"
  member   = "serviceAccount:${google_service_account.warpstream_agent.email}"
}

resource "google_service_account_iam_binding" "workload_identity_binding" {
  service_account_id = google_service_account.warpstream_agent.name
  role               = "roles/iam.workloadIdentityUser"

  members = [
    "serviceAccount:${var.project_id}.svc.id.goog[${var.kubernetes_namespace}/warpstream-agent]",
  ]
}
config:
    bucketURL: gcs://my-bucket-name
    
serviceAccount:
    annotations:
        "iam.gke.io/gcp-service-account": "warpstream-agent@xxxxxxxxxx.iam.gserviceaccount.com"

The WarpStream Agent embeds the official AWS Golang SDK V2 so authentication/authorization with the specified S3 bucket can be handled in

Environment variables can be set in our K8s chart using the extraEnvs and extraEnvsFrom fields in the .

The WarpStream Agent embeds the official GCP Golang SDK so authentication/authorization with the storage bucket can be handled .

The MinIO team has a on their website as well. Note that the region query argument is a no-op, but required to pass validation in the S3 SDK.

Create an account with .

Make sure that the Agents to perform operations on both the old bucket and the new bucket.

Deploy the Agents with set to the new bucket instead of the old one. This will cause the Agents to write all new files (both for ingestion and compaction) to the new bucket while still allowing them to read historical data from the old bucket. You'll also need to set the -additionalDeadscannerBucketURLs flag or WARPSTREAM_ADDITIONAL_DEADSCANNER_BUCKET_URLS environment variable in the Agents to point to the old bucket so that the Agents continue to scan the old bucket for dead files and delete them.

Documentation: Example Terraform

Example Configuration on our

Documentation: Example Terraform

Example Configuration on our

Documentation: Example Terraform

Example Configuration on our

any of the expected ways, like using a shared credentials file, environment variables, or simply running the Agents in an environment with an appropriate IAM role with Write/Read/Delete/List permissions on the S3 bucket.
charts values.yaml
in any of the expected ways
more detailed integration guide
Cloudflare
have permission
the bucketURL flag
https://docs.aws.amazon.com/eks/latest/userguide/pod-identities.html
Helm Chart
https://learn.microsoft.com/en-us/azure/aks/workload-identity-deploy-cluster
Helm Chart
https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity
Helm Chart
VPC Endpoint
An S3 bucket with 16 different cluster prefixes under the top-level warpstream prefix.