Orbit (Cluster Linking)
Replicate and migrate Kafka clusters.
Overview
Orbit is a feature of the WarpStream BYOC product that can automatically replicate data and metadata from any source Kafka-compatible cluster to a destination WarpStream cluster. Orbit is built directly into the WarpStream Agents, and doesn't require any additional deployments or binaries, it runs directly in the destination WarpStream cluster.
Orbit can perfectly mirror Kafka records (including headers and preserving offsets), topic configurations, cluster configurations, topic deletions, consumer group offsets, and more.
Records copied from source to destination are identical down to the offsets of the records. This, combined with the fact that Orbit can also copy consumer groups offsets, means that Orbit can be used to reliably migrate Kafka clients to the destination cluster without any duplicate consumption of data.
There are four primary use-cases for Orbit:
Replication from a source Kafka cluster into WarpStream for migrations.
Creating replicated copies of source Kafka clusters for disaster recovery.
Creating replicated copies of source Kafka clusters for scalable tiered storage.
Creating replicated copies of source Kafka clusters to provide additional read replicas, and isolate workloads. For example, analytical or batch consumers could read from the WarpStream replica cluster to avoid overloading the source Kafka cluster.
We highly recommend watching the overview video below, which provides a comprehensive overview of Orbit features.
Orbit Deployment
The WarpStream agent binary ships with Orbit built in. Agents launched with the jobs role will automatically pick up Orbit configurations and perform replication.
Orbit Configuration
Orbit is fully controllable from a single YAML file which can be edited through the WarpStream console or created programmatically through Terraform.
Overview
# WarpStream Orbit YAML configuration file.
source_bootstrap_brokers:
# These nodes should belong to the same source Kafka cluster.
# Orbit does not (currently) support replicating from multiple
# source Kafka clusters.
- hostname: localhost # Kafka server hostname.
port: 9092
- hostname: example.kafkaserver.com
port: 9092
source_cluster_credentials:
# Username, password, and mechanism are optional and should be
# omitted if SASL is not required to connect to the source
# cluster.
#
# Set username/password as environment variables in the Agents
# with an ORBIT_ prefix.
sasl_username_env: SASL_USERNAME_ENV_VAR
sasl_password_env: SASL_PASSWORD_ENV_VAR
sasl_mechanism: plain
# Independent of whether SASL is enabled, set this to true or
# false based on whether the source cluster has TLS enabled.
use_tls: false
# Whether TLS verification should be skipped.
tls_insecure_skip_verify: false
topic_mappings:
- source_regex: topic.* # Exact match, not substring.
# Optional prefix added to topic names when they're replicated.
# Leave this empty if you want topics replicated to WarpStream
# to have the exact same name as they did in the source cluster.
destination_prefix: ""
begin_fetch_at_latest_offset: false
cluster_config:
# Don't copy cluster configs from source (topic auto topic creation
# policy, default partition count, etc).
copy_source_cluster_configuration: false
consumer_groups:
# Optional prefix added to consumer group names when they're
# replicated. Leave this empty if you want consumer groups replicated
# to WarpStream to have the exact same name as they did in the source
# cluster.
destination_group_prefix: ""
# Whether consumer groups and their corresponding offsets
# should be copied or not.
copy_offsets_enabled: true
warpstream:
# Used to control the rate at which Orbit will consume from
# the source Kafka topic. Higher values will increase
# throughput, but put more load on the source cluster.
cluster_fetch_concurrency: 2Here's a quick summary of the YAML file above:
Orbit is set up to connect to a Kafka cluster with 2 source brokers.
It will connect to the source brokers using SASL PLAIN and will retrieve the username and password by reading the
ORBIT_SASL_USERNAME_ENV_VARandORBIT_SASL_PASSWORD_ENV_VARenvironment variables respectively in the Agent.Topics which match the
topic.*regex in the source will be mirrored to the destination with their name unchanged (no prefix will be added to their name).Cluster configurations will not be copied over.
Consumer group names and offsets will be replicated into the destination cluster. The consumer group names will be mirrored to the destination with their name unchanged (no prefix will be added to their name).
At most 2 concurrent fetches (globally, regardless of the number of Agents) will be executed in the WarpStream Agents at any given time.
Specify Source Brokers
Host address and port of the the source cluster Kafka brokers or Agents.
source_bootstrap_brokers:
- hostname: localhost
port: 9092
- hostname: example.kafkaserver.com
port: 9092Source cluster brokers are listed under the
source_bootstrap_brokersin the YAML.Each broker is defined as a
hostnameand aport.The brokers must be reachable from your WarpStream Agents.
Specify Credentials
Credentials used by the destination WarpStream agents to connect to the source cluster.
Credentials for the brokers are defined under the
source_cluster_credentialssection in the YAML.Since the Agents runs in your VPC, WarpStream does not have access to your credentials.
To force the Agents to use TLS when connecting to your source clusters, use the
use_tlsflag in thesource_cluster_credentialssection.If you have ACLs enabled in the source cluster, then you need to make sure the principal associated with Orbit credentials used to connect with the source has the following permissions:
DescribeDescribeConfigsandReadoperations for the Topic resources which you want mirrored by Orbit.DescribeandDescribeConfigsfor the Cluster resource.DescribeandReadoperations for the Group resource (consumer groups).
SASL
source_cluster_credentials:
sasl_username_env: SASL_USERNAME_ENV_VAR
sasl_password_env: SASL_PASSWORD_ENV_VAR
use_tls: false
sasl_mechanism: plainBoth the
SASL_USERNAME_ENV_VARand theSASL_PASSWORD_ENV_VARfields refer to environment variables. The Agents will append aORBIT_prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured asORBIT_SASL_USERNAME_ENV_VARandORBIT_SASL_PASSWORD_ENV_VARrespectively.By default Orbit uses the
plainSASL mechanism. To specify the SASL mechanism add thesasl_mechanismfield in thesource_cluster_credentialssection. Supported mechanisms include:plainscram-256scram-512
mTLS PEM encoded certs
source_cluster_credentials:
mtls_client_cert_env: MTLS_CERT_PATH_ENV
mtls_client_key_env: MTLS_KEY_PATH_ENV
mtls_server_ca_cert_env: MTLS_SERVER_CA_CERT_PATH_ENV
use_tls: trueThe
MTLS_CERT_PATH_ENV,MTLS_KEY_PATH_ENV, andMTLS_SERVER_CA_CERT_PATH_ENVfields refer to environment variables. The Agents will append aORBIT_prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured asORBIT_MTLS_CERT_PATH_ENV,ORBIT_MTLS_KEY_PATH_ENV, andORBIT_MTLS_SERVER_CA_CERT_PATH_ENVrespectively.Note that these environment variables should point to file paths for the respective PEM encoded certificate files and must not encode the certificates directly.
mtls_server_ca_cert_envis an optional field. However, it is highly recommended to set the environment variable to the public keys of the certificate authorities that sign your server certificates. If this environment variable is not set, WarpStream defaults to trusting all server certificates from your Operating System's root certificate store.
mTLS JKS encoded certs
source_cluster_credentials:
jks_key_store_file_path_env: JKS_KEYSTORE_PATH_ENV
jks_key_store_password_env: JKS_KEYSTORE_PASSWORD_ENV
jks_key_store_key_password_env: JKS_KEY_STORE_KEY_PASSWORD_ENV
jks_trust_store_file_path_env: JKS_TRUST_STORE_FILE_PATH_ENV
jks_trust_store_password_env: JKS_TRUST_STORE_PASSWORD_ENV
use_tls: trueJKS_KEYSTORE_PATH_ENV,JKS_KEYSTORE_PASSWORD_ENV,JKS_KEY_STORE_KEY_PASSWORD_ENV,JKS_TRUST_STORE_FILE_PATH_ENV,JKS_TRUST_STORE_PASSWORD_ENVfields refer to environment variables. The Agents will append aORBIT_prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured asORBIT_JKS_KEYSTORE_PATH_ENV,ORBIT_JKS_KEYSTORE_PASSWORD_ENV, etc.The environment variable for
jks_key_store_file_path_envshould be assigned the file path to the jks keystore file.The environment variable for
jks_key_store_password_envshould be assigned the key store password.The environment variable for
jks_key_store_key_password_envshould be assigned the key password.The environment variable for
jks_trust_store_file_path_envshould be assigned the file path to the trust store.The environment variable for
jks_trust_store_password_envshould be assigned the trust store password.
Topic Mappings
Replicate Topics
Topics can be replicated by adding mappings under the topic_mappings section of the YAML.
topic_mappings:
- source_regex: foo_topic.*
# Topics whose name match the foo_topic.* regex will be
# replicated with their name unmodified.
destination_prefix: ""
- source_regex: bar_topic.*
# Topics whose name match the bar_topic.* regex will be
# replicated, but their name will have baz_ added as a prefix.
destination_prefix: baz_
topic_config_overrides:
- config_name: retention.ms
config_value: "72000000"
- config_name: cleanup.policy
config_value: "delete,compact"
begin_fetch_at_latest_offset: falseTo replicate a topic from the source cluster to the destination, add a topic mapping to the
topic_mappingssection with asource_regexwhich will match the source topic which should be replicated.For example in the above regex, topics in the source cluster with prefixes
foo_topicandbar_topicwould be replicated.Note that the mapping
*will not match every sequence of characters. Since Orbit accepts regular expressions, use.*to match every sequence of characters.
Note that topics being actively replicated by Orbit cannot receive writes from external Kafka clients until the
irreversible_disable_orbit_managementfield in the topic mapping is set to true.See the producer migration section for details on how to use this field.
Leave
destination_prefixempty if you want topic names to be perfectly preserved when topics are replicated, or set it to a non-empty string if you want Orbit to add a prefix to Orbit-replicated topics.For example, if the source cluster has a topic
bar_topicA, then it will be replicated in WarpStream asbaz_bar_topicA, but a topic with namefoo_topicBwill be replicated with its name unmodified.
topic_config_overridescan be used to override the value of the specified topic configurations in the source cluster which will be copied to the destination. Currently, only theretention.msandcleanup.policytopic configurations are supported for override. See the topic mapping rules section for further details.By default, Orbit will copy all records starting from the earliest topic partition offsets in the source cluster for a given topic. If
begin_fetch_at_latest_offsetis set totrue, then Orbit will copy all records beginning at the latest topic partition offsets in the source cluster. See the topic mapping rules section for further details.
Pause Topic Replication
topic_mappings:
- source_regex: foo_topic.*
destination_prefix: ""
- source_regex: bar_topic.*
destination_prefix: baz_To pause replication for a topic, remove the mapping which matches the topic from the YAML.
In some cases, it's possible that a topic can match multiple topic mappings, and you do not want to remove all the topic mappings. In such a case, you can explicitly mark a topic mapping as paused. In the following example, replication for the topic
testwill be paused since the first topic mapping which matchestesthaspauseset totrue.
topic_mappings:
- source_regex: test
pause: true
- source_regex: .*Topic Mapping Rules
Detailed rules about the topic mappings which indicates which topics mappings match which topics.
topic_mappings:
- source_regex: foo_topic.*
destination_prefix: ""
- source_regex: bar_topic.*
destination_prefix: baz_
topic_config_overrides:
- config_name: retention.ms
config_value: "72000000"
begin_fetch_at_latest_offset: falseEach mapping consists of
source_regex,destination_prefix, andirreversible_disable_orbit_managementfields.Orbit will create a new topic in the destination only if there exists a topic mapping with a
source_regexwhich matches the source cluster topic.To determine if a topic in the source cluster matches a topic mapping, it must match a
source_regexin at least one topic mapping. For example, the topicfoo_topicAmatches the first topic mapping.The value of the
destination_prefixfield is appended as a prefix to the topic name for the topic which will be created in the destination cluster. Empty string is a valid value fordestination_prefixif you want the topics to have the same name in the destination WarpStream cluster as they did in the source cluster.To determine if a topic which has been created by Orbit in the destination cluster matches a topic matching, it must match the
destination_prefix+source_regexin at least one of the topic mappings. For example,bar_topicBis created asbaz_bar_topicBin the destination, andbaz_bar_topicBin the destination, matches the second topic mapping.A topic in either the source or the destination cluster will match the mappings in the
topic_mappingssection in the order in which the topic mappings are listed, and only a single topic mapping will apply (the first one).Orbit will keep the topic configurations in sync for a source/destination topic pair only if the destination topic matches a topic mapping.
Orbit will keep deleted topics in sync for a source/destination topic pair only if the destination topic matches a topic mapping.
Orbit will replicate records for a source/destination topic pair only if the destination topic matches a topic mapping.
The
irreversible_disable_orbit_managementwill only be respected if a destination topic matches the topic mapping with the flag set totrue. See more details on topic migration in the producer migration section where the meaning of this configuration option is explained.topic_config_overridesoverrides values of specified topic configurations for a given topic mapping. For example, if topics which match thebar_topic.*regex in the source cluster have aretention.msvalue of60000000, the corresponding topics which will be created in the destination will have aretention.msvalue of72000000.begin_fetch_at_latest_offsetcan be set totruefor a topic mapping to begin fetches at the latest offset of a topic partition rather than the earliest.By default this field is set to
falseand records are copied starting from the earliest topic partition offsets in the source cluster.When set to
true, any topics created in the destination Orbit cluster which match that topic mapping will begin fetching records starting from the latest offsets in the source cluster.As an example if some topic
topic_Apartition0, has records from offsets100to1000in the source cluster, then Orbit will start mirroringtopic_Abeginning at offset1000rather than100. Every records after offset1000will be mirrored identically.
Topic Configurations
Orbit will copy topic configurations from the source cluster for all of the topics which Orbit is managing in the destination cluster. Note that only the following configuration values are considered relevant to WarpStream and will be copied by Orbit:
cleanup.policymessage.timestamp.typeretention.msdelete.retention.msmin.compaction.lag.ms
Sync Consumer Groups
Orbit can sync offsets of consumer groups which exist in the source cluster to the destination cluster.
consumer_groups:
copy_offsets_enabled: true
destination_group_prefix: ""Set
copy_offsets_enabledtotrueunder theconsumer_groupssection and Orbit will start mirroring consumer groups from the source cluster to the destination.Similar to topic names, consumer group replication can optionally be configured to add a destination group prefix to the replicated consumer group names. Leave
destination_group_prefixempty if you want consumer group names to be replicated as-is.
Cluster configurations
Sync cluster configurations
cluster_config:
copy_source_cluster_configuration: trueOrbit will copy cluster configurations from the source cluster to the destination if the
copy_source_cluster_configurationflag under thecluster_configsection is set totrue.Note that Orbit only copies cluster configurations which are relevant to WarpStream. These include:
num.partitionswhich is used to determine the default number of partitions a new topic should be created with. This is referring to new topics created by a Kafka client, and not Orbit.auto.create.topics.enablewhich indicates if new topics should be created automatically when the client performs a metadata request for a topic which does not exist.log.retention.mslog.retention.minuteslog.retention.hourswhich indicate the default retention used by the topics.offsets.retention.minuteswhich indicates the expiration duration of the offsets committed by a consumer group.
Disable copying of records
cluster_config:
disable_copy_records: trueYou can prevent Orbit from copying any records from the source to the destination cluster for every single topic mapping by setting the above cluster config.
Orbit UI
You can edit the Orbit YAML, view the offset lag for topic partitions being mirrored, and view which topics Orbit is mirroring all from the WarpStream UI.
Create Orbit YAML
Select your virtual cluster from the WarpStream console and navigate to the Orbit tab. You'll see a text editor which can be used to create and edit the Orbit YAML file.

Edit the YAML, and hit Save. Note that the pipeline is paused by default. You can start it by hitting the toggle next to the word PAUSED.

Once the pipeline is running, you'll see the topics which are mirrored by Orbit show up under the Orbit tab. In the image below, Orbit created a topic called
test_topicwith a prefixorbit_as expected.

To modify the YAML, hit the edit button, edit the YAML, and hit Save again. Orbit will create a new configuration for the pipeline. In the image below, I've set
copy_offsets_enabledtotrueso that consumer group offsets will be copied from the source cluster to the destination. Note: You will have to hit the Deploy button on the new version before it will take effect. You can also deploy older versions to "rollback".

You can view the lag between the source and destination cluster topic partitions from the Consumers tab. The
orbit_source_cluster_offsetsconsumer group will include every topic and partition that is being mirrored by Orbit. The offset lag indicates how far behind Orbit is compared to the source cluster. The time lag indicates time.Since(X), where X is the timestamp of the most recently copied source record.

Programmatic Access: HTTP API and Terraform
For programmatic access, Orbit can be configured via its HTTP API or declaratively using our Terraform provider.
Orbit HTTP API: create, update, and deploy configurations
This guide shows how to manage the Orbit pipeline via HTTP APIs using an API key. It covers creating the pipeline, creating/updating configurations (YAML), deploying a configuration, and pausing/resuming the pipeline.
Prerequisites:
You have a Virtual Cluster ID (for example
vci_xxx...).You have a WarpStream API key with admin permissions for that virtual cluster.
You know your Console/API base URL (for example
https://console.yourcompany.com).
Authentication:
Include your API key in the
warpstream-api-keyheader on every request.
Notes:
All endpoints are
POSTand accept/return JSON.When updating an Orbit configuration, you create a new configuration version and then deploy it. You do not mutate an existing configuration in place.
1) Create the Orbit pipeline (once)
Endpoint: /api/v1/create_pipeline
Request body:
curl -sS -X POST \
-H "Content-Type: application/json" \
-H "warpstream-api-key: $WARPSTREAM_API_KEY" \
"$CONSOLE_BASE_URL/api/v1/create_pipeline" \
-d '{
"virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
"pipeline_name": "orbit_default",
"pipeline_type": "orbit"
}'Response will include a pipeline_id you can save. If the pipeline already exists, you can obtain its ID using “Describe pipeline” below.
2) Create a new Orbit configuration (YAML)
Endpoint: /api/v1/create_pipeline_configuration
Send the YAML as a string in configuration_yaml:
ORBIT_YAML=$(cat <<'YAML'
source_bootstrap_brokers:
- hostname: "kafka-source-1"
port: 9092
topic_mappings:
- source_regex: "^orders\\.v[0-9]+$"
destination_prefix: "src_"
cluster_config:
copy_source_cluster_configuration: true
disable_copy_records: false
consumer_groups:
destination_group_prefix: "src_"
copy_offsets_enabled: true
warpstream:
cluster_fetch_concurrency: 8
YAML
)
curl -sS -X POST \
-H "Content-Type: application/json" \
-H "warpstream-api-key: $WARPSTREAM_API_KEY" \
"$CONSOLE_BASE_URL/api/v1/create_pipeline_configuration" \
-d '{
"virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
"pipeline_id": "'$PIPELINE_ID'",
"configuration_yaml": '"'$ORBIT_YAML'"'
}'The response includes a configuration_id for the created version.
3) Deploy a configuration and control state (run/pause)
Endpoint: /api/v1/change_pipeline_state
Deploy a specific configuration and set desired state to running:
curl -sS -X POST \
-H "Content-Type: application/json" \
-H "warpstream-api-key: $WARPSTREAM_API_KEY" \
"$CONSOLE_BASE_URL/api/v1/change_pipeline_state" \
-d '{
"virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
"pipeline_id": "'$PIPELINE_ID'",
"deployed_configuration_id": "'$CONFIGURATION_ID'",
"desired_state": "running"
}'Pause the pipeline without changing the deployed configuration:
curl -sS -X POST \
-H "Content-Type: application/json" \
-H "warpstream-api-key: $WARPSTREAM_API_KEY" \
"$CONSOLE_BASE_URL/api/v1/change_pipeline_state" \
-d '{
"virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
"pipeline_id": "'$PIPELINE_ID'",
"desired_state": "paused"
}'To “update” Orbit’s configuration, create a new configuration version (step 2) and deploy it (step 3).
4) Discover the Orbit pipeline and configurations
If you don’t have the pipeline ID, describe by type to fetch the Orbit pipeline and its configuration versions.
Endpoint: /api/v1/describe_pipeline
curl -sS -X POST \
-H "Content-Type: application/json" \
-H "warpstream-api-key: $WARPSTREAM_API_KEY" \
"$CONSOLE_BASE_URL/api/v1/describe_pipeline" \
-d '{
"virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
"pipeline_type": "orbit"
}'The response includes the pipeline overview and an array of pipeline_configurations with their id and version.
You can also list pipelines:
Endpoint: /api/v1/list_pipelines
curl -sS -X POST \
-H "Content-Type: application/json" \
-H "warpstream-api-key: $WARPSTREAM_API_KEY" \
"$CONSOLE_BASE_URL/api/v1/list_pipelines" \
-d '{
"virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'"
}'5) Delete the Orbit pipeline (optional)
Endpoint: /api/v1/delete_pipeline
curl -sS -X POST \
-H "Content-Type: application/json" \
-H "warpstream-api-key: $WARPSTREAM_API_KEY" \
"$CONSOLE_BASE_URL/api/v1/delete_pipeline" \
-d '{
"virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
"pipeline_id": "'$PIPELINE_ID'"
}'Migration
Orbit can be used to migrate existing Kafka compatible clusters to WarpStream.
Replication
First, follow the instructions above to configure WarpStream to replicate all of the topics and consumer groups from the source cluster. Monitor the lag of the orbit_source_cluster_offsets consumer group to wait for WarpStream to "catch up" on all the source Kafka topics.
Consumers
The following migration flow will ensure that your Kafka consumer clients can be migrated to WarpStream without duplicate processing of records. The Orbit YAML must have the following config.
consumer_groups:
copy_offsets_enabled: true Orbit will periodically sync committed offsets for consumer groups to WarpStream.
Let's say you want to migrate some consumer group
X.First, shut down the consumers belonging to this group in the source cluster.
Wait 1 minute for Orbit to copy the consumer group offsets for group
Xto the destination cluster. You can use thewarpstream_consumer_group_max_offsetmetric emitted from the agents to view the offsets copied by Orbit to the destination. See the monitoring page for more details on this metric. You can also see the currently committed offsets in the consumers tab in the WarpStream UI.Restart consumers but point them to the destination WarpStream cluster instead by changing their bootstrap URL. The consumers will start reading from the offsets which were last committed in the source cluster, picking up where they left off.
Producers
The following migration flow will ensure that your Kafka producer clients can be cut over to WarpStream seamlessly. Using the irreversible_disable_orbit_management field in a topic mapping will disable Orbit management of the topic. The following example shows how and when to use the field.
Let's say you have the following topic mapping in the Orbit YAML, and you have decided that you want to move the topic
topic0matched by this mapping from the source to the destination. To make the example more complicated, let's assume that atopic1also exists in the source cluster which is also being mirrored by Orbit.
topic_mappings:
- source_regex: topic.*
destination_prefix: test0_First shut down the producer clients producing to
topic0in the source.Wait for the offset lag between the source topic
topic0and destination topictest0_topic0to become 0. You can view the lag for the topic from theorbit_source_cluster_offsetsgroup in the Consumer view in the Warpstream console. The lag for all of the partitions of the topic are also emitted as thewarpstream_consumer_group_lagmetric from the Agents. See the monitoring page for more details on this metric. You can also see the currently committed offsets in the consumers tab in the WarpStream UI.Create a new topic mapping to migrate the
test0_topictopic. Note theirreversible_disable_orbit_managementfield. This field indicates that Orbit should stop managing the topic which will allow Kafka Producer clients to write to the topic. This decision is irreversible.
topic_mappings:
- source_regex: topic0
destination_prefix: test0_
irreversible_disable_orbit_management: true
- source_regex: topic.*
destination_prefix: test0_Since topic mappings match topics in order, the
test0_topic0will now accept writes from regular Kafka clients, and Orbit will no longer copy records from source for this topic. Note thattest0_topic1is unaffected because it does not match the topic mapping.Restart your producer clients by pointing them to the destination WarpStream cluster. Since
topic0is known astest0_topic0in the destination due to the prefix, you'll have to write totest0_topic0instead.
Observability
You can monitor Orbit using the metrics built into the WarpStream Agents. See our monitoring documents for more details.
Orbit will also emit some additional metrics:
You can use the
warpstream_agent_kafka_produce_with_offset_uncompressed_bytes_countermetric to view the data successfully written by Orbit to the destination WarpStream cluster.You can use the
warpstream_consumer_group_lagand filter for the grouporbit_source_cluster_offsetsand the topics you care about to determine the lag between your topics in the source cluster and WarpStream. Specifically, the lag is defined as the difference (in offsets) between the max offset in the source cluster and the most recently copied offset. View the monitoring page for more details on this metric.You can use the
warpstream_consumer_group_estimated_lag_very_coarse_do_not_use_to_measure_e2e_secondsmetric and filter for the consumer grouporbit_source_cluster_offsetsandtopicto get the time since the latest record copied by Orbit was assigned a timestamp in the source. View the monitoring page for more details on this metric.This metric is a rough measure of E2E lag from the data being timestamped by your producer Kafka client to it being queriable in WarpStream. However, the timestamp is processed asynchronously and it can take 10-15 seconds before they are used to compute time lag. This means the time lag is 10-15 seconds higher than the actual E2E lag from your Kafka producer client to the consumers reading from WarpStream, and also higher than just the orbit replication lag because it includes the time it takes for the data to be processed through the source Kafka cluster as well.
You can use the
warpstream_consumer_group_max_offsetmetric emitted from the agents to view the offsets copied by Orbit to the consumer group in the destination. See the monitoring page for more details on this metric.warpstream_agent_kafka_source_cluster_connections_countermetric is a counter which is incremented for short lived Orbit connections made against the source cluster. It can be used to estimate the rate at which Orbit is creating connections against the source cluster.
Tuning Orbit
Concurrency
The primary knob for controlling Orbit throughput is cluster_fetch_concurrency . This is a global knob that controls how many concurrent Orbit fetch jobs will run against the source Kafka cluster regardless of how many Agents are deployed.
warpstream:
cluster_fetch_concurrency: 10Increasing this value will increase throughput, but put more load on the source Kafka cluster / WarpStream Agents, and vice versa.
Memory Usage
The default Orbit configuration is optimized for high throughput, but may lead to excessive memory usage or even OOMs depending on the nature of the workload / topics being replicated. If you experience memory issues while running Orbit, consider modifying your Orbit pipeline as follows:
warpstream:
fetch_config:
# Reduced from default of 104857600.
fetch_max_bytes: 20857600
# Reduced from default of 52428800.
fetch_max_partition_bytes: 10428800Unclean Leader Election
Note that it is expected that the topics replicated by Orbit have unclean.leader.election.enable set to false. If this configuration is set to true , and if there can be data loss in the source cluster, Orbit won't retroactively delete data in the destination WarpStream cluster which has been replicated using Orbit, but is lost in the source cluster.
Last updated
Was this helpful?