Agent Configuration Reference

Reference documentation for WarpStream Agent flags.

Required Command Line Flags and Environment Variables

All WarpStream Agent configurations can be set via command-line flags or environment variables. Command-line flags take precedence over environment variables.

FlagEnvironment VariableDescription

bucketURL

WARPSTREAM_BUCKET_URL

agentKey

WARPSTREAM_AGENT_KEY

WarpStream Agent Key obtained from the WarpStream admin console.

defaultVirtualClusterID

WARPSTREAM_DEFAULT_VIRTUAL_CLUSTER_ID

WarpStream Virtual Cluster ID obtained from the WarpStream admin console.

region

WARPSTREAM_REGION

WarpStream virtual cluster's control plane region. Can be obtained from the WarpStream admin console.

Optional Command Line Flags and Environment Variables

All WarpStream Agent configuration can be set either via command line flags, or environment variables. Command line flags take precedence over environment variables.

FlagEnvironment VariableDescription

apiKey

WARPSTREAM_API_KEY

Backward-compatible alias of agentKey

agentGroup

WARPSTREAM_AGENT_GROUP

Name of the 'group' that the Agent belongs to. This feature is used to isolate groups of Agents that belong to the same logical cluster, but should not communicate with each other because they're deployed in separate cloud accounts, vpcs, or regions. Leave blank to indicate the Agent belongs to the default group.

heartbeatEvery

WARPSTREAM_HEARTBEAT_EVERY

How often the agent should heartbeat the WarpStream backend. Recommended to not modify this.

enableKinesis

WARPSTREAM_ENABLE_KINESIS

Enable kinesis server (default true).

httpPort

WARPSTREAM_HTTP_PORT

The port the Agent will use for serving HTTP requests (Kinesis API requests, distributed file cache requests, exposing Prometheus metrics, etc) (default 8080).

enableKafka

WARPSTREAM_ENABLE_KAFKA

Enable kafka server (default true).

kafkaPort

WARPSTREAM_KAFKA_PORT

The port the Agent will listen on for Kafka client TCP connections (default 9092).

kafkaFetchCompression

WARPSTREAM_KAFKA_FETCH_COMPRESSION

Compression type to use for Fetch responses: none, gzip, snappy, lz4 (by default), zstd. This is only used if no compression is set explicitly, or if 'agent' type compress.

kafkaMetadataRefreshInterval

WARPSTREAM_KAFKA_METADATA_REFRESH_INTERVAL

Period of time at which topic metadata is refreshed. Unlike Kafka, this metadata cache refresh also affects the timestamp type associated with a stream (default 1m0s).

kafkaHandleConsumerGroupsInBackend

WARPSTREAM_KAFKA_HANDLE_CONSUMER_GROUPS_IN_BACKEND

Handle consumer group 'JoinGroup' and 'SyncGroup' requests in the backend instead of in the agent. When handled in the backend, the 'Rebalance Timeout' is always set to 10 seconds, whereas in the agent, it will be determined by client specifications. Enabling this option offers the advantage of reduced error potential and seamless integration of backend improvements and bug fixes. However, exercise caution when enabling it for large consumer groups, as a 10-second rebalance timeout may lead to extended rebalancing times and consequently, prolonged consumption pauses. Warning: Ensure uniformity within your agent pool regarding this setting. Having a mix of enabled and disabled settings may lead to rebalancing issues and potential disruptions.

kafkaHighCardinalityMetrics

WARPSTREAM_KAFKA_HIGH_CARDINALITY_METRICS

Whether to emit metrics with high cardinality tags. When set to true, it enables detailed tracking at a granular level, such as metrics for individual fetch and produce operations on a per-topic basis. Use with caution as high cardinality can significantly increase the amount of data collected, potentially impacting performance.

kafkaCloseIdeConnAfter

WARPSTREAM_KAFKA_CLOSE_IDLE_CONN_AFTER

Close idle connections after the number of duration specified by this config (default 10m0s).

kafkaMaxFetchRequestBytesUncompressedOverride

WARPSTREAM_KAFKA_MAX_FETCH_REQUEST_BYTES_UNCOMPRESSED_OVERRIDE

Maximum number of uncompressed bytes that can be fetched in a single fetch request (default 128MiB).

kafkaMaxFetchPartitionBytesUncompressedOverride

WARPSTREAM_KAFKA_MAX_FETCH_PARTITION_BYTES_UNCOMPRESSED_OVERRIDE

Mmaximum number of uncompressed bytes that can be fetched in a single fetch request for a single topic-partition (default 128MiB).

fileCacheSizeBytes

WARPSTREAM_FILE_CACHE_SIZE_BYTES

Size of the Agent file cache size in bytes. This cache is used to reduce the number of object storage GET requests that required to serve consumers. Defaults to 0.5GiB/vCPU if omitted.

fileCacheExtraReplicas

WARPSTREAM_FILE_CACHE_EXTRA_REPLICAS

Number of extra replicas for the distributed file cache. Helps improve availability and reduce errors when Agents shutdown ungracefully. You can override this to 0, but do not increase this value above 1 unless you know what you're doing.

gracefulShutdownDuration

WARPSTREAM_GRACEFUL_SHUTDOWN_DURATION

Amount of time to wait after receiving SIGTERM before exiting to allow graceful removal from service discovery (default 1m0s).

maxConcurrentRequestPerCPU

WARPSTREAM_MAX_CONCURRENT_REQUEST_PER_CPU

Maximum number of concurrent requests (per CPU) allowed by the Kafka server.

enableClusterWideEnvironment

WARPSTREAM_ENABLE_CLUSTER_WIDE_ENVIRONMENT

Whether the cluster wide environment should be enabled.

clusterWideEnvironmentPort

WARPSTREAM_CLUSTER_WIDE_ENVIRONMENT_PORT

The default port to use for the cluster wide environment (default 9999).

ingestionBucketURL

WARPSTREAM_INGESTION_BUCKET_URL

Object storage URL to use for data ingestion (produce requests).

compactionBucketURL

WARPSTREAM_COMPACTION_BUCKET_URL

Object storage URL to use for files created by compactions.

batchTimeout

WARPSTREAM_BATCH_TIMEOUT

Controls the maximum amount of time the WarpStream Agents will allow a produced record to remain buffered in batch before flushing it to object storage. Increasing this value reduces object storage API costs, but increases latency, and vice versa. Note the WarpStream agents never acknowledge data until it has been flushed to object storage so this value has no impact on correctness or durability guarantees, only latency. Defaults to 250ms, minimum is 50ms.

batchMaxSizeBytes

WARPSTREAM_BATCH_MAX_SIZE_BYTES

Controls the maximum number of bytes that will be buffered by the WarpStream Agents before flushing it to object storage. Increasing this value reduces object storage API costs for workloads that write more than uncompressed 16MiB/s/Agent, but increases latency, and vice versa. Note the WarpStream agents never acknowledge data until it has been flushed to object storage so this value has no impact on correctness or durability guarantees, only latency. Defaults to 4MiB, minimum is 1MiB, maximum is 8MiB.

batcherMaxInflightBytesPerCPU

WARPSTREAM_BATCHER_MAX_INFLIGHT_BYTES_PER_CPU

Maximum number of inflight bytes per CPU from Produce requests that have not yet been flushed to object storage that can be in memory at once before the Agent will begin backpressuring.

batcherMaxInflightFilesPerCPU

WARPSTREAM_BATCHER_MAX_INFLIGHT_FILES_PER_CPU

Maximum number of inflight files per CPU from Produce requests that have not yet been flushed to object storage that can be in memory at once before the Agent will begin backpressuring.

metadataURL

WARPSTREAM_METADATA_URL

Address for WarpStream metadata backend (default "https://api.prod.us-east-1.warpstream.com").

schemaRegistryURL

WARPSTREAM_SCHEMA_REGISTRY_URL

Address for WarpStream schema registry backend.

enableSchemaRegistry

WARPSTREAM_ENABLE_SCHEMA_REGISTRY

Enable schema registry server.

schemaRegistryPort

WARPSTREAM_SCHEMA_REGISTRY_PORT

Port to run the schema registry server on (default 9094).

region

WARPSTREAM_REGION

Region that the WarpStream control plane is running in. Value for your cluster can be obtained from the WarpStream console. Optional if your control plane is in us-east-1, otherwise must be provided.

kafkaLoadBalancingInterval

WARPSTREAM_KAFKA_LOAD_BALANCING_INTERVAL

Time after which the Kafka connection will be closed. This mechanism helps load balance the clients by forcing them to query the magic URL again. By resetting the connection periodically, clients are evenly distributed across available Kafka connections. (default 8760h0m0s).

kafkaInterzoneLoadBalancingInterval

WARPSTREAM_KAFKA_INTERZONE_LOAD_BALANCING_INTERVAL

Interval at which the Kafka connection assesses if the client-agent connection resides in the same Availability Zone (AZ). If they are not in the same AZ and there are agents available within the client's AZ, the connection is terminated. This approach encourages load balancing by prompting clients to re-query the magic URL and, consequently, connect to agents within their respective AZ. For this mechanism to function, clients should include 'waprstream_az=X' and 'warpstream_interzone_lb=true' in their clientID. (default 1m0s).

kafkaLoadBalancingDrainingTime

WARPSTREAM_KAFKA_LOAD_BALANCING_DRAINING_TIME

Time given to gracefully close the Kafka connection after the reconnect interval is reached.

advertiseHostnameStrategy

WARPSTREAM_ADVERTISE_HOSTNAME_STRATEGY

Which hostname strategy should be used the agent should advertise itself on. Accepted values: auto-ip4/auto-ip6/local/custom.

auto-ip4 means that it will try to automatically find an IP v4 that makes sense

auto-ip6 will do the same with an IPv6.

local will use localhost

If you select custom them you have to also define advertiseHostnameCustom.

advertiseHostnameCustom

WARPSTREAM_ADVERTISE_HOSTNAME_CUSTOM

Custom hostname value to advertise to service discovery for clustering purposes if the custom advertise strategy is selected.

requireSASLAuthentication

WARPSTREAM_REQUIRE_SASL_AUTHENTICATION

If set to true, the Agents will require that all Kafka clients authenticate themselves with proper SASL credentials.

logInterval

WARPSTREAM_LOG_INTERVAL

Interval for logging service status (default 15s).

enableDatadogProfiling

WARPSTREAM_ENABLE_DATADOG_PROFILING

Enable datadog profiling (default false).

enableDatadogTracing

WARPSTREAM_ENABLE_DATADOG_TRACING

Enable datadog tracing (default false).

enablePrometheusMetrics

WARPSTREAM_ENABLE_PROMETHEUS_METRICS

Enable prometheus metrics (default true).

enableDatadogMetrics

WARPSTREAM_ENABLE_DATADOG_METRICS

Enable datadog metrics (default false).

disableConsumerGroupMetrics

WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS

Disable the consumer group offset metrics automatically published by default (warpstream_consumer_group_lag and warpstream_consumer_group_max_offset).

disableConsumerGroupsMetricsTags

WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS

Comma-separated list of the tags to not expose in the consumer group offset metrics (warpstream_consumer_group_lag and warpstream_consumer_group_max_offset).

disableLogsCollection

WARPSTREAM_DISABLE_LOGS_COLLECTION

Disable the logs collection sent to warpstream backend (default false).

roles

WARPSTREAM_AGENT_ROLES

Roles that the agent should start (comma-separated) (default "proxy, jobs").

bentoBucketURL

WARPSTREAM_BENTO_BUCKET_URL

Bucket URL to use when fetching the bento configuration.

bentoConfigPath

WARPSTREAM_BENTO_CONFIG_PATH

Path in the bucket to fetch the bento configuration.

enableManagedPipelines

WARPSTREAM_ENABLE_MANAGED_PIPELINES

Whether data pipelines can be managed by the control plane.

availabilityZoneRequired

WARPSTREAM_AVAILABILITY_ZONE_REQUIRED

When enabled, the agent will synchronously try to resolve its az during startup for 1min, and will not start serving its /v1/status health check until it succeeds. The process will exit early if it did not manage to resolve the availability zone. Only used in agent mode.

kafkaTLS

WARPSTREAM_TLS_ENABLED

Enable TLS for Kafka client/Agent connections. Must also specify tlsServerCertFile and tlsServerPrivateKeyFile.

tlsServerCertFile

WARPSTREAM_TLS_SERVER_CERT_FILE

Path to the X.509 certificate file in PEM format for the server.

tlsServerPrivateKeyFile

WARPSTREAM_TLS_SERVER_PRIVATE_KEY_FILE

Path to the X.509 private key file in PEM format for the server.

tlsClientCACertFile

WARPSTREAM_TLS_CLIENT_CA_CERT_FILE

Path to the X.509 certificate file in PEM format for the client certificate authority. If not specified, the host's root certificate pool will be used for client certificate verification.

requireMTLSAuthentication

WARPSTREAM_REQUIRE_MTLS_AUTHENTICATION

If set to true, the Agents will require that all Kafka clients authenticate themselves with mTLS. enableTLS must be set to true.

tlsPrincipalMappingRule

WARPSTREAM_TLS_PRINCIPAL_MAPPING_RULE

Regular expression to extract the ACL principal from the client certificate distinguished name. requireMTLSAuthentication must be set to true.

storageCompression

WARPSTREAM_STORAGE_COMPRESSION

Compression used in object store, either zstd or lz4.

externalSchemaRegistryBasicAuthUsername

WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_BASIC_AUTH_USERNAME

Username for the external schema registry.

externalSchemaRegistryBasicAuthPassword

WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_BASIC_AUTH_PASSWORD

Password for the external schema registry.

externalSchemaRegistryTlsServerCACertFile

WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_TLS_SERVER_CA_CERT_FILE

Path to the X.509 certificate file in PEM format for the schema registry server's certificate authority. If not specified, the host's root certificate pool will be used for client certificate verification.

externalSchemaRegistryTlsClientCertFile

WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_TLS_CLIENT_CERT_FILE

Path to the X.509 certificate file in PEM format for the schema registry client.

externalSchemaRegistryTlsClientPrivateKeyFile

WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_TLS_CLIENT_PRIVATE_KEY_FILE

Path to the X.509 private key file in PEM format for the schema registry client.

enableSetMutexProfileFraction

WARPSTREAM_ENABLE_SET_MUTEX_PROFILE_FRACTION

Enable this flag to call runtime.SetMutexProfileFraction with the value passed along -mutexProfileFraction.

mutexProfileFraction

WARPSTREAM_MUTEX_PROFILE_FRACTION

Tune the value passed to call runtime.SetMutexProfileFraction (default 10).

enableSetBlockProfileRate

WARPSTREAM_ENABLE_SET_BLOCK_PROFILE_RATE

Enable this flag to call runtime.SetBlockProfileRate with the value passed along -blockProfileRate.

blockProfileRate

WARPSTREAM_BLOCK_PROFILE_RATE

Tune the value passed to call runtime.SetBlockProfileRate (default 100000000).

maxProduceRecordSizeBytes

WARPSTREAM_MAX_PRODUCE_RECORD_SIZE_BYTES

Maximum size of a record that can be produced. Value needs to be between 4MiB and 64 MiB (default 32 MB).

disableProfileForwarding

WARPSTREAM_DISABLE_PROFILE_FORWARDING

Disable profile forwarding to warpstream backend. Note that if both Datadog profiling and profile forwarding are on, profile forwarding will automatically be turned off (default false).

maxProfileSize

WARPSTREAM_MAX_PROFILE_SIZE

Maximum number of bytes for buffering profiles in memory. Value needs to be smaller than 1 MiB (default 500 KiB). This is only used if -disableProfileForwarding is false.

zonedCIDRBlocks

WARPSTREAM_ZONED_CIDR_BLOCKS

A mapping of availability zones to Kafka client IPs. The mapping should be a <> delimited list of AZ to CIDR range pairs, where each pair starts with an AZ, a @, and a comma separated list of CIDR blocks for that given AZ. For example, us-east-1a@10.0.0.0/19,10.0.32.0/19<>us-east-1b@10.0.64.0/19<>us-east-1c@10.0.96.0/19.

autoTuneFetchLimits

WARPSTREAM_AUTO_TUNE_FETCH_LIMITS

Allow consumer fetch limits to be auto-adjusted (default false).

N/A

WARPSTREAM_AVAILABILITY_ZONE

Override the Availability Zone name which is discovered by the WarpStream Agent automatically using Cloud Instance Metadata (see section below).

We do not recommend overriding this in the general case.

N/A

WARPSTREAM_LOG_LEVEL

Override the log level of the WarpStream Agent from the default value of info. Acceptable values are debug, info, warn, and error.

Defaults to info.

N/A

WARPSTREAM_DISCOVERY_KAFKA_HOSTNAME_OVERRIDE

Overrides the hostname that the WarpStream Agents will report to the WarpStream discovery system (instead of the default of reporting their private IP4 address). This is useful when running the Agents behind a network load balancer which requires that the Agents report their hostname as the hostname of the network load balancer instead of their private IP.

Last updated