# Orbit (Cluster Linking)

## Overview

Orbit is a feature of the WarpStream BYOC product that can automatically replicate data *and metadata* from any *source* Kafka-compatible cluster to a *destination* WarpStream cluster. Orbit is built directly into the WarpStream Agents, and doesn't require any additional deployments or binaries, it runs directly in the destination WarpStream cluster.

Orbit can perfectly mirror Kafka records (including headers and preserving offsets), topic configurations, cluster configurations, topic deletions, consumer group offsets, and more.

Records copied from source to destination are *identical* down to the offsets of the records. This, combined with the fact that Orbit can also copy consumer groups offsets, means that Orbit can be used to reliably migrate Kafka clients to the destination cluster without any duplicate consumption of data.

There are four primary use-cases for Orbit:

1. Replication from a source Kafka cluster into WarpStream for migrations.
2. Creating replicated copies of source Kafka clusters for disaster recovery.
3. Creating replicated copies of source Kafka clusters for scalable tiered storage.
4. Creating replicated copies of source Kafka clusters to provide additional read replicas, and isolate workloads. For example, analytical or batch consumers could read from the WarpStream replica cluster to avoid overloading the source Kafka cluster.

We highly recommend watching the overview video below, which provides a comprehensive overview of Orbit features.

{% embed url="<https://player.vimeo.com/video/1058321014>" %}

## Orbit Deployment

The WarpStream agent binary ships with Orbit built in. Agents launched with the [`jobs` role](https://docs.warpstream.com/warpstream/kafka/advanced-agent-deployment-options/splitting-agent-roles) will automatically pick up Orbit configurations and perform replication.

## Orbit Configuration

Orbit is fully controllable from a single YAML file which can be edited through the WarpStream console or created programmatically through [Terraform](https://github.com/warpstreamlabs/terraform-provider-warpstream/blob/dcb0c7b9f7ae3ef69bea0c513807407808d2a0d7/examples/resources/warpstream_pipeline/resource.tf#L39).

### Overview

```yaml
# WarpStream Orbit YAML configuration file.

source_bootstrap_brokers:
    # These nodes should belong to the same source Kafka cluster.
    # Orbit does not (currently) support replicating from multiple
    # source Kafka clusters.
    - hostname: localhost # Kafka server hostname.
      port: 9092
    - hostname: example.kafkaserver.com
      port: 9092

source_cluster_credentials:
    # Username, password, and mechanism are optional and should be
    # omitted if SASL is not required to connect to the source
    # cluster.
    #
    # Set username/password as environment variables in the Agents
    # with an ORBIT_ prefix.
    sasl_username_env: SASL_USERNAME_ENV_VAR
    sasl_password_env: SASL_PASSWORD_ENV_VAR
    sasl_mechanism: plain
    # Independent of whether SASL is enabled, set this to true or
    # false based on whether the source cluster has TLS enabled.
    use_tls: false
    # Whether TLS verification should be skipped.
    tls_insecure_skip_verify: false

topic_mappings:
    - source_regex: topic.* # Exact match, not substring.
      # Optional prefix added to topic names when they're replicated.
      # Leave this empty if you want topics replicated to WarpStream
      # to have the exact same name as they did in the source cluster.
      destination_prefix: ""
      begin_fetch_at_latest_offset: false

cluster_config:
    # Don't copy cluster configs from source (topic auto topic creation
    # policy, default partition count, etc).
    copy_source_cluster_configuration: false

consumer_groups:
    # Optional prefix added to consumer group names when they're
    # replicated. Leave this empty if you want consumer groups replicated
    # to WarpStream to have the exact same name as they did in the source
    # cluster.
    destination_group_prefix: ""
    # Whether consumer groups and their corresponding offsets
    # should be copied or not.
    copy_offsets_enabled: true             

warpstream:
    # Used to control the rate at which Orbit will consume from
    # the source Kafka topic. Higher values will increase
    # throughput, but put more load on the source cluster.
    cluster_fetch_concurrency: 2
```

Here's a quick summary of the YAML file above:

* Orbit is set up to connect to a Kafka cluster with 2 source brokers.
* It will connect to the source brokers using SASL PLAIN and will retrieve the username and password by reading the `ORBIT_SASL_USERNAME_ENV_VAR` and `ORBIT_SASL_PASSWORD_ENV_VAR` environment variables respectively in the Agent.
* Topics which match the `topic.*` regex in the source will be mirrored to the destination with their name unchanged (no prefix will be added to their name).
* Cluster configurations will not be copied over.
* Consumer group names and offsets will be replicated into the destination cluster. The consumer group names will be mirrored to the destination with their name unchanged (no prefix will be added to their name).
* At most 2 concurrent fetches (globally, regardless of the number of Agents) will be executed in the WarpStream Agents at any given time.

### Specify Source Brokers

Host address and port of the the source cluster Kafka brokers or Agents.

```yaml
source_bootstrap_brokers:
    - hostname: localhost
      port: 9092
    - hostname: example.kafkaserver.com
      port: 9092
```

* Source cluster brokers are listed under the `source_bootstrap_brokers` in the YAML.
* Each broker is defined as a `hostname` and a `port`.
* The brokers must be reachable from your WarpStream Agents.

### Specify Credentials

Credentials used by the destination WarpStream agents to connect to the source cluster.

* Credentials for the brokers are defined under the `source_cluster_credentials` section in the YAML.
* Since the Agents runs in your VPC, WarpStream does not have access to your credentials.
* To force the Agents to use TLS when connecting to your source clusters, use the `use_tls` flag in the `source_cluster_credentials` section.
* If you have ACLs enabled in the source cluster, then you need to make sure the principal associated with Orbit credentials used to connect with the source has the following permissions:
  * `Describe` `DescribeConfigs` and `Read` operations for the Topic resources which you want mirrored by Orbit.
  * `Describe` and `DescribeConfigs` for the Cluster resource.
  * `Describe` and `Read` operations for the Group resource (consumer groups). Read is required to fetch offsets.

#### SASL

```yaml
source_cluster_credentials:
    sasl_username_env: SASL_USERNAME_ENV_VAR
    sasl_password_env: SASL_PASSWORD_ENV_VAR
    use_tls: false
    sasl_mechanism: plain
```

* Both the `SASL_USERNAME_ENV_VAR` and the `SASL_PASSWORD_ENV_VAR` fields refer to environment variables. The Agents will append a `ORBIT_` prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as `ORBIT_SASL_USERNAME_ENV_VAR` and `ORBIT_SASL_PASSWORD_ENV_VAR` respectively.
* By default Orbit uses the `plain` SASL mechanism. To specify the SASL mechanism add the `sasl_mechanism` field in the `source_cluster_credentials` section. Supported mechanisms include:
  * `plain`
  * `scram-256`
  * `scram-512`

#### mTLS PEM encoded certs

```yaml
source_cluster_credentials:
    mtls_client_cert_env: MTLS_CERT_PATH_ENV
    mtls_client_key_env: MTLS_KEY_PATH_ENV
    mtls_server_ca_cert_env: MTLS_SERVER_CA_CERT_PATH_ENV
    use_tls: true
```

* The `MTLS_CERT_PATH_ENV`, `MTLS_KEY_PATH_ENV`, and `MTLS_SERVER_CA_CERT_PATH_ENV` fields refer to environment variables. The Agents will append a `ORBIT_` prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as `ORBIT_MTLS_CERT_PATH_ENV`, `ORBIT_MTLS_KEY_PATH_ENV`, and `ORBIT_MTLS_SERVER_CA_CERT_PATH_ENV` respectively.
* Note that these environment variables should point to file paths for the respective PEM encoded certificate files and must not encode the certificates directly.
* `mtls_server_ca_cert_env` is an optional field. However, it is highly recommended to set the environment variable to the public keys of the certificate authorities that sign your server certificates. If this environment variable is not set, WarpStream defaults to trusting all server certificates from your Operating System's root certificate store.

#### mTLS JKS encoded certs

```yaml
source_cluster_credentials:
    jks_key_store_file_path_env: JKS_KEYSTORE_PATH_ENV
    jks_key_store_password_env: JKS_KEYSTORE_PASSWORD_ENV
    jks_key_store_key_password_env: JKS_KEY_STORE_KEY_PASSWORD_ENV
    jks_trust_store_file_path_env: JKS_TRUST_STORE_FILE_PATH_ENV
    jks_trust_store_password_env: JKS_TRUST_STORE_PASSWORD_ENV
    use_tls: true
```

* `JKS_KEYSTORE_PATH_ENV`, `JKS_KEYSTORE_PASSWORD_ENV`, `JKS_KEY_STORE_KEY_PASSWORD_ENV`, `JKS_TRUST_STORE_FILE_PATH_ENV`, `JKS_TRUST_STORE_PASSWORD_ENV` fields refer to environment variables. The Agents will append a `ORBIT_` prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as `ORBIT_JKS_KEYSTORE_PATH_ENV`, `ORBIT_JKS_KEYSTORE_PASSWORD_ENV`, etc.
  * The environment variable for `jks_key_store_file_path_env` should be assigned the file path to the jks keystore file.
  * The environment variable for `jks_key_store_password_env` should be assigned the key store password.
  * The environment variable for `jks_key_store_key_password_env` should be assigned the key password.
  * The environment variable for `jks_trust_store_file_path_env` should be assigned the file path to the trust store.
  * The environment variable for `jks_trust_store_password_env` should be assigned the trust store password.

### Topic Mappings

#### Replicate Topics

Topics can be replicated by adding mappings under the `topic_mappings` section of the YAML.

<pre class="language-yaml"><code class="lang-yaml">topic_mappings:
    - source_regex: foo_topic.*
      # Topics whose name match the foo_topic.* regex will be
      # replicated with their name unmodified.
      destination_prefix: ""
    - source_regex: bar_topic.*
      # Topics whose name match the bar_topic.* regex will be
      # replicated, but their name will have baz_ added as a prefix.
      destination_prefix: baz_
<strong>      topic_config_overrides:
</strong>        - config_name: retention.ms
          config_value: "72000000"
        - config_name: cleanup.policy
          config_value: "delete,compact"
      begin_fetch_at_latest_offset: false
</code></pre>

* To replicate a topic from the source cluster to the destination, add a topic mapping to the `topic_mappings` section with a `source_regex` which will match the source topic which should be replicated.
  * For example in the above regex, topics in the source cluster with prefixes `foo_topic` and `bar_topic` would be replicated.
  * Note that the mapping `*` will not match every sequence of characters. Since Orbit accepts regular expressions, use `.*` to match every sequence of characters.
* Note that topics being actively replicated by Orbit **cannot receive writes from external Kafka clients** until the `irreversible_disable_orbit_management` field in the topic mapping is set to true.
  * See the [producer migration](#producers) section for details on how to use this field.
* Leave `destination_prefix` empty if you want topic names to be perfectly preserved when topics are replicated, or set it to a non-empty string if you want Orbit to add a prefix to Orbit-replicated topics.
  * For example, if the source cluster has a topic `bar_topicA` , then it will be replicated in WarpStream as `baz_bar_topicA` , but a topic with name `foo_topicB` will be replicated with its name unmodified.
* `topic_config_overrides` can be used to override the value of the specified topic configurations in the source cluster which will be copied to the destination. Currently, only the `retention.ms` and `cleanup.policy` topic configurations are supported for override. See the [topic mapping rules](#topic-mapping-rules) section for further details.
* By default, Orbit will copy all records starting from the earliest topic partition offsets in the source cluster for a given topic. If `begin_fetch_at_latest_offset` is set to `true`, then Orbit will copy all records beginning at the latest topic partition offsets in the source cluster. See the [topic mapping rules](#topic-mapping-rules) section for further details.

#### Pause Topic Replication

```yaml
topic_mappings:
    - source_regex: foo_topic.*
      destination_prefix: ""
    - source_regex: bar_topic.*
      destination_prefix: baz_
```

* To pause replication for a topic, remove the mapping which matches the topic from the YAML.
* In some cases, it's possible that a topic can match multiple topic mappings, and you do not want to remove all the topic mappings. In such a case, you can explicitly mark a topic mapping as paused. In the following example, replication for the topic `test` will be paused since the first topic mapping which matches `test` has `pause` set to `true`.

```yaml
topic_mappings:
    - source_regex: test
      pause: true
    - source_regex: .*
```

#### Topic Mapping Rules

Detailed rules about the topic mappings which indicates which topics mappings match which topics.

```yaml
topic_mappings:
    - source_regex: foo_topic.*
      destination_prefix: ""
    - source_regex: bar_topic.*
      destination_prefix: baz_
      topic_config_overrides:
        - config_name: retention.ms
          config_value: "72000000"
      begin_fetch_at_latest_offset: false
```

* Each mapping consists of `source_regex`, `destination_prefix`, and `irreversible_disable_orbit_management` fields.
* Orbit will create a new topic in the destination only if there exists a topic mapping with a `source_regex` which matches the source cluster topic.
* If a topic already exists in the destination cluster but is not managed by Orbit, Orbit will automatically take ownership of it if **both** of the following conditions are met:

  * The topic is empty (has never been written to).
  * The topic's partition count and `cleanup.policy` match those of the corresponding source topic. Partition count must match because it cannot decrease once set. `cleanup.policy` must match because there are [restrictions on how it can be changed](https://docs.warpstream.com/warpstream/reference/protocol-and-feature-support/topic-configuration-reference#cleanup.policy) (e.g. a non-compacted topic cannot be made compacted). Other configurations like `retention.ms` are not compared and will be synced from the source after Orbit takes ownership.

  This is useful when pre-creating topics in WarpStream before setting up Orbit replication. Once Orbit takes ownership, it will manage the topic's configurations and replication like any other Orbit-managed topic.
* To determine if a topic in the source cluster matches a topic mapping, it must match a `source_regex` in at least one topic mapping. For example, the topic `foo_topicA` matches the first topic mapping.
* The value of the `destination_prefix` field is appended as a prefix to the topic name for the topic which will be created in the destination cluster. Empty string is a valid value for `destination_prefix` if you want the topics to have the same name in the destination WarpStream cluster as they did in the source cluster.
* To determine if a topic which has been created by Orbit in the destination cluster matches a topic matching, it must match the `destination_prefix` + `source_regex` in at least one of the topic mappings. For example, `bar_topicB` is created as `baz_bar_topicB` in the destination, and `baz_bar_topicB` in the destination, matches the second topic mapping.
* A topic in either the source or the destination cluster will match the mappings in the `topic_mappings` section in the order in which the topic mappings are listed, and only a single topic mapping will apply (the first one).
* Orbit will keep the topic configurations in sync for a source/destination topic pair only if the destination topic matches a topic mapping.
* Orbit will keep deleted topics in sync for a source/destination topic pair only if the destination topic matches a topic mapping.
* Orbit will replicate records for a source/destination topic pair only if the destination topic matches a topic mapping.
* The `irreversible_disable_orbit_management` will only be respected if a destination topic matches the topic mapping with the flag set to `true`. See more details on topic migration in the [producer migration section](#producers) where the meaning of this configuration option is explained.
* `topic_config_overrides` overrides values of specified topic configurations for a given topic mapping. For example, if topics which match the `bar_topic.*` regex in the source cluster have a `retention.ms` value of `60000000`, the corresponding topics which will be created in the destination will have a `retention.ms`value of `72000000`.
* `begin_fetch_at_latest_offset` can be set to `true` for a topic mapping to begin fetches at the latest offset of a topic partition rather than the earliest.
  * By default this field is set to `false` and records are copied starting from the earliest topic partition offsets in the source cluster.
  * When set to `true`, any topics created in the destination Orbit cluster which match that topic mapping will begin fetching records starting from the latest offsets in the source cluster.
    * As an example if some topic `topic_A` partition `0`, has records from offsets `100` to `1000` in the source cluster, then Orbit will start mirroring `topic_A` beginning at offset `1000` rather than `100`. Every records after offset `1000` will be mirrored identically.

#### Topic Configurations

Orbit will copy topic configurations from the source cluster for all of the topics which Orbit is managing in the destination cluster. Note that only the following configuration values are considered relevant to WarpStream and will be copied by Orbit:

* `cleanup.policy`
* `message.timestamp.type`
* `retention.ms`
* `delete.retention.ms`
* `min.compaction.lag.ms`

### Sync Consumer Groups

Orbit can sync offsets of consumer groups which exist in the source cluster to the destination cluster.

```yaml
consumer_groups:
    copy_offsets_enabled: true
    destination_group_prefix: ""
```

* Set `copy_offsets_enabled` to `true` under the `consumer_groups` section and Orbit will start mirroring consumer groups from the source cluster to the destination.
* Similar to topic names, consumer group replication can optionally be configured to add a destination group prefix to the replicated consumer group names. Leave `destination_group_prefix` empty if you want consumer group names to be replicated as-is.

### Cluster configurations

#### Sync cluster configurations

```yaml
cluster_config:
    copy_source_cluster_configuration: true
```

* Orbit will copy cluster configurations from the source cluster to the destination if the `copy_source_cluster_configuration` flag under the `cluster_config` section is set to `true`.
* Note that Orbit only copies cluster configurations which are relevant to WarpStream. These include:
  * `num.partitions` which is used to determine the default number of partitions a new topic should be created with. This is referring to new topics created by a Kafka client, and not Orbit.
  * `auto.create.topics.enable` which indicates if new topics should be created automatically when the client performs a metadata request for a topic which does not exist.
  * `log.retention.ms` `log.retention.minutes` `log.retention.hours` which indicate the default retention used by the topics.
  * `offsets.retention.minutes` which indicates the expiration duration of the offsets committed by a consumer group.

#### Disable copying of records

```yaml
cluster_config:
    disable_copy_records: true
```

* You can prevent Orbit from copying any records from the source to the destination cluster for every single topic mapping by setting the above cluster config.

## Orbit UI

You can edit the Orbit YAML, view the offset lag for topic partitions being mirrored, and view which topics Orbit is mirroring all from the WarpStream UI.

#### Create Orbit YAML

1. Select your virtual cluster from the WarpStream console and navigate to the Orbit tab. You'll see a text editor which can be used to create and edit the Orbit YAML file.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-b5f3442396dc99512e4fa5cf810447c5ef8472c3%2FScreenshot%202024-10-28%20at%204.53.12%E2%80%AFPM.png?alt=media" alt=""><figcaption><p>Orbit YAML editor</p></figcaption></figure>

2. Edit the YAML, and hit Save. Note that the pipeline is paused by default. You can start it by hitting the toggle next to the word PAUSED.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-517e5de4b9da7d648f86ea511abda97da47b73cd%2FScreenshot%202024-10-28%20at%204.55.29%E2%80%AFPM.png?alt=media" alt=""><figcaption><p>Version 0 of Orbit YAML is deployed, but pipeline is paused</p></figcaption></figure>

3. Once the pipeline is running, you'll see the topics which are mirrored by Orbit show up under the Orbit tab. In the image below, Orbit created a topic called `test_topic` with a prefix `orbit_` as expected.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-18ce9a27dd70a9e1aff93a97de8bf21b1bcd1b92%2FScreenshot%202024-10-28%20at%205.02.31%E2%80%AFPM.png?alt=media" alt=""><figcaption><p>orbit_test_topic created by Orbit shows up under the Orbit tab</p></figcaption></figure>

4. To modify the YAML, hit the edit button, edit the YAML, and hit Save again. Orbit will create a new configuration for the pipeline. In the image below, I've set `copy_offsets_enabled` to `true` so that consumer group offsets will be copied from the source cluster to the destination. **Note:** You will have to hit the Deploy button on the new version before it will take effect. You can also deploy older versions to "rollback".

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-c209bb7a088aaad8b45c0331f9bd90dc6f79a0ad%2FScreenshot%202024-10-28%20at%205.09.47%E2%80%AFPM.png?alt=media" alt=""><figcaption><p>copy_offsets_enabled is true so orbit will start syncing source/destination consumer group offsets</p></figcaption></figure>

5. You can view the lag between the source and destination cluster topic partitions from the Consumers tab. The `orbit_source_cluster_offsets` consumer group will include every topic and partition that is being mirrored by Orbit. The offset lag indicates how far behind Orbit is compared to the source cluster. The time lag indicates time.Since(X), where X is the timestamp of the most recently copied source record.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-a7e482f0a753c00f55f353d51b4365d58cc66c65%2FScreenshot%202024-10-28%20at%205.20.11%E2%80%AFPM.png?alt=media" alt=""><figcaption><p>Consumer group view showing offset lag for mirrored topic partitions, offset lag is 0 in this case.</p></figcaption></figure>

## Programmatic Access: HTTP API and Terraform

For programmatic access, Orbit can be configured via its [HTTP API](https://docs.warpstream.com/warpstream/reference/api-reference/pipelines/create-pipeline#create-orbit-pipeline) or declaratively using our [Terraform provider](https://github.com/warpstreamlabs/terraform-provider-warpstream/blob/dcb0c7b9f7ae3ef69bea0c513807407808d2a0d7/examples/resources/warpstream_pipeline/resource.tf#L39).

### Orbit HTTP API: create, update, and deploy configurations

This guide shows how to manage the Orbit pipeline via HTTP APIs using an API key. It covers creating the pipeline, creating/updating configurations (YAML), deploying a configuration, and pausing/resuming the pipeline.

Prerequisites:

* You have a Virtual Cluster ID (for example `vci_xxx...`).
* You have a WarpStream API key with admin permissions for that virtual cluster.
* You know your Console/API base URL (for example `https://console.yourcompany.com`).

Authentication:

* Include your API key in the `warpstream-api-key` header on every request.

Notes:

* All endpoints are `POST` and accept/return JSON.
* When updating an Orbit configuration, you create a new configuration version and then deploy it. You do not mutate an existing configuration in place.

***

#### 1) Create the Orbit pipeline (once)

Endpoint: `/api/v1/create_pipeline`

Request body:

```bash
curl -sS -X POST \
  -H "Content-Type: application/json" \
  -H "warpstream-api-key: $WARPSTREAM_API_KEY" \
  "$CONSOLE_BASE_URL/api/v1/create_pipeline" \
  -d '{
    "virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
    "pipeline_name": "orbit_default",
    "pipeline_type": "orbit"
  }'
```

Response will include a `pipeline_id` you can save. If the pipeline already exists, you can obtain its ID using “Describe pipeline” below.

***

#### 2) Create a new Orbit configuration (YAML)

Endpoint: `/api/v1/create_pipeline_configuration`

Send the YAML as a string in `configuration_yaml`:

```bash
ORBIT_YAML=$(cat <<'YAML'
source_bootstrap_brokers:
  - hostname: "kafka-source-1"
    port: 9092
topic_mappings:
  - source_regex: "^orders\\.v[0-9]+$"
    destination_prefix: "src_"
cluster_config:
  copy_source_cluster_configuration: true
  disable_copy_records: false
consumer_groups:
  destination_group_prefix: "src_"
  copy_offsets_enabled: true
warpstream:
  cluster_fetch_concurrency: 8
YAML
)

curl -sS -X POST \
  -H "Content-Type: application/json" \
  -H "warpstream-api-key: $WARPSTREAM_API_KEY" \
  "$CONSOLE_BASE_URL/api/v1/create_pipeline_configuration" \
  -d '{
    "virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
    "pipeline_id": "'$PIPELINE_ID'",
    "configuration_yaml": '"'$ORBIT_YAML'"'
  }'
```

The response includes a `configuration_id` for the created version.

***

#### 3) Deploy a configuration and control state (run/pause)

Endpoint: `/api/v1/change_pipeline_state`

Deploy a specific configuration and set desired state to running:

```bash
curl -sS -X POST \
  -H "Content-Type: application/json" \
  -H "warpstream-api-key: $WARPSTREAM_API_KEY" \
  "$CONSOLE_BASE_URL/api/v1/change_pipeline_state" \
  -d '{
    "virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
    "pipeline_id": "'$PIPELINE_ID'",
    "deployed_configuration_id": "'$CONFIGURATION_ID'",
    "desired_state": "running"
  }'
```

Pause the pipeline without changing the deployed configuration:

```bash
curl -sS -X POST \
  -H "Content-Type: application/json" \
  -H "warpstream-api-key: $WARPSTREAM_API_KEY" \
  "$CONSOLE_BASE_URL/api/v1/change_pipeline_state" \
  -d '{
    "virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
    "pipeline_id": "'$PIPELINE_ID'",
    "desired_state": "paused"
  }'
```

To “update” Orbit’s configuration, create a new configuration version (step 2) and deploy it (step 3).

***

#### 4) Discover the Orbit pipeline and configurations

If you don’t have the pipeline ID, describe by type to fetch the Orbit pipeline and its configuration versions.

Endpoint: `/api/v1/describe_pipeline`

```bash
curl -sS -X POST \
  -H "Content-Type: application/json" \
  -H "warpstream-api-key: $WARPSTREAM_API_KEY" \
  "$CONSOLE_BASE_URL/api/v1/describe_pipeline" \
  -d '{
    "virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
    "pipeline_type": "orbit"
  }'
```

The response includes the pipeline overview and an array of `pipeline_configurations` with their `id` and `version`. We return up to 100 of your latest configs sorted from oldest to newest. You can find the active config with `deployed_configuration_id` .

You can also list pipelines:

Endpoint: `/api/v1/list_pipelines`

```bash
curl -sS -X POST \
  -H "Content-Type: application/json" \
  -H "warpstream-api-key: $WARPSTREAM_API_KEY" \
  "$CONSOLE_BASE_URL/api/v1/list_pipelines" \
  -d '{
    "virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'"
  }'
```

***

#### 5) Delete the Orbit pipeline (optional)

Endpoint: `/api/v1/delete_pipeline`

```bash
curl -sS -X POST \
  -H "Content-Type: application/json" \
  -H "warpstream-api-key: $WARPSTREAM_API_KEY" \
  "$CONSOLE_BASE_URL/api/v1/delete_pipeline" \
  -d '{
    "virtual_cluster_id": "'$VIRTUAL_CLUSTER_ID'",
    "pipeline_id": "'$PIPELINE_ID'"
  }'
```

***

## Migration

Orbit can be used to migrate existing Kafka compatible clusters to WarpStream.

### Replication

First, follow the instructions above to configure WarpStream to replicate all of the topics and consumer groups from the source cluster. Monitor the lag of the `orbit_source_cluster_offsets` consumer group to wait for WarpStream to "catch up" on all the source Kafka topics.

### Consumers

The following migration flow will ensure that your Kafka consumer clients can be migrated to WarpStream without duplicate processing of records. The Orbit YAML must have the following config.

```yaml
consumer_groups:
    copy_offsets_enabled: true         
```

1. Orbit will periodically sync committed offsets for consumer groups to WarpStream.
2. Let's say you want to migrate some consumer group `X`.
3. First, shut down the consumers belonging to this group in the source cluster.
4. Wait 1 minute for Orbit to copy the consumer group offsets for group `X` to the destination cluster. You can use the `warpstream_consumer_group_max_offset` metric emitted from the agents to view the offsets copied by Orbit to the destination. See the [monitoring](https://docs.warpstream.com/warpstream/agent-setup/monitor-the-warpstream-agents#observability) page for more details on this metric. You can also see the currently committed offsets in the consumers tab in the WarpStream UI.
5. Restart consumers but point them to the destination WarpStream cluster instead by changing their bootstrap URL. The consumers will start reading from the offsets which were last committed in the source cluster, picking up where they left off.

### Producers

The following migration flow will ensure that your Kafka producer clients can be cut over to WarpStream seamlessly. Using the `irreversible_disable_orbit_management` field in a topic mapping will disable Orbit management of the topic. The following example shows how and when to use the field.

1. Let's say you have the following topic mapping in the Orbit YAML, and you have decided that you want to move the topic `topic0` matched by this mapping from the source to the destination. To make the example more complicated, let's assume that a `topic1` also exists in the source cluster which is also being mirrored by Orbit.

```yaml
topic_mappings:
    - source_regex: topic.*
      destination_prefix: test0_
```

2. First shut down the producer clients producing to `topic0` in the source.
3. Wait for the offset lag between the source topic `topic0` and destination topic `test0_topic0` to become 0. You can view the lag for the topic from the `orbit_source_cluster_offsets` group in the Consumer view in the Warpstream console. The lag for all of the partitions of the topic are also emitted as the `warpstream_consumer_group_lag` metric from the Agents. See the [monitoring](https://docs.warpstream.com/warpstream/agent-setup/monitor-the-warpstream-agents#observability) page for more details on this metric. You can also see the currently committed offsets in the consumers tab in the WarpStream UI.
4. Create a new topic mapping to migrate the `test0_topic` topic. Note the `irreversible_disable_orbit_management` field. This field indicates that Orbit should stop managing the topic which will allow Kafka Producer clients to write to the topic. This decision is irreversible.

```yaml
topic_mappings:
    - source_regex: topic0
      destination_prefix: test0_
      irreversible_disable_orbit_management: true
    - source_regex: topic.*
      destination_prefix: test0_
```

5. Since topic mappings match topics in order, the `test0_topic0` will now accept writes from regular Kafka clients, and Orbit will no longer copy records from source for this topic. Note that `test0_topic1` is unaffected because it does not match the topic mapping.
6. Restart your producer clients by pointing them to the destination WarpStream cluster. Since `topic0` is known as `test0_topic0` in the destination due to the prefix, you'll have to write to `test0_topic0` instead.

## Observability

You can monitor Orbit using the metrics built into the WarpStream Agents. See our [monitoring](https://docs.warpstream.com/warpstream/agent-setup/monitor-the-warpstream-agents) documents for more details.

Orbit will also emit some additional metrics:

1. You can use the `warpstream_agent_kafka_produce_with_offset_uncompressed_bytes_counter` metric to view the data successfully written by Orbit to the destination WarpStream cluster.
2. You can use the `warpstream_consumer_group_lag` and filter for the group `orbit_source_cluster_offsets` and the topics you care about to determine the lag between your topics in the source cluster and WarpStream. Specifically, the lag is defined as the difference (in offsets) between the max offset in the source cluster and the most recently copied offset. View the [monitoring](https://docs.warpstream.com/warpstream/agent-setup/monitor-the-warpstream-agents#observability) page for more details on this metric.
3. You can use the `warpstream_consumer_group_estimated_lag_very_coarse_do_not_use_to_measure_e2e_seconds` metric and filter for the consumer group `orbit_source_cluster_offsets` and `topic` to get the time since the latest record copied by Orbit was assigned a timestamp in the source. View the [monitoring](https://docs.warpstream.com/warpstream/agent-setup/monitor-the-warpstream-agents/monitoring-consumer-groups#metrics) page for more details on this metric.

   This metric is a rough measure of E2E lag from the data being timestamped by your producer Kafka client to it being queriable in WarpStream. However, the timestamp is processed asynchronously and it can take 10-15 seconds before they are used to compute time lag. This means the time lag is 10-15 seconds higher than the actual E2E lag from your Kafka producer client to the consumers reading from WarpStream, and also higher than just the orbit replication lag because it includes the time it takes for the data to be processed through the source Kafka cluster as well.
4. You can use the `warpstream_consumer_group_max_offset` metric emitted from the agents to view the offsets copied by Orbit to the consumer group in the destination. See the [monitoring](https://docs.warpstream.com/warpstream/agent-setup/monitor-the-warpstream-agents/monitoring-consumer-groups#metrics) page for more details on this metric.
5. `warpstream_agent_kafka_source_cluster_connections_counter` metric is a counter which is incremented for short lived Orbit connections made against the source cluster. It can be used to estimate the rate at which Orbit is creating connections against the source cluster.

## Tuning Orbit

### Concurrency

The primary knob for controlling Orbit throughput is `cluster_fetch_concurrency` . This is a global knob that controls how many concurrent Orbit fetch jobs will run against the source Kafka cluster regardless of how many Agents are deployed.

```yaml
warpstream:
    cluster_fetch_concurrency: 10
```

Increasing this value will increase throughput, but put more load on the source Kafka cluster / WarpStream Agents, and vice versa.

### Memory Usage

The default Orbit configuration is optimized for high throughput, but may lead to excessive memory usage or even OOMs depending on the nature of the workload / topics being replicated. If you experience memory issues while running Orbit, consider modifying your Orbit pipeline as follows:

```yaml
warpstream:
  fetch_config:
    # Reduced from default of 104857600.
    fetch_max_bytes: 20857600
    # Reduced from default of 52428800.
    fetch_max_partition_bytes: 10428800
```

## Unclean Leader Election

Note that it is expected that the topics replicated by Orbit have `unclean.leader.election.enable` set to `false`. If this configuration is set to `true` , and if there can be data loss in the source cluster, Orbit won't retroactively delete data in the destination WarpStream cluster which has been replicated using Orbit, but is lost in the source cluster.

## FAQ

### What is the difference between "managed by Orbit" and matching a `topic_mappings` entry?

A topic is "managed by Orbit" if and only if it was created by Orbit or Orbit took ownership of it, and it hasn't been migrated using `irreversible_disable_orbit_management: true`. This is a property of the destination topic itself, and is distinct from whether a source topic currently matches a `topic_mappings` regex. For example, removing a topic's `topic_mappings` entry does not stop it from being managed by Orbit.

### How does retention work for historical data replicated with `begin_fetch_at_latest_offset: false`?

Retention in WarpStream is based on when the data was **written to WarpStream**, not the original record timestamps from the source cluster. The retention clock resets when Orbit replicates data into the destination cluster.

This means that if a topic in the source cluster has a 7-day retention and you start replicating it with `begin_fetch_at_latest_offset: false`, all replicated historical data will be retained for the full 7 days from the time it was replicated into WarpStream — even if the records were originally produced days or weeks ago in the source cluster.

### When syncing consumer group offsets, which topics are included?

Orbit only copies consumer group offsets for topics that are **managed by Orbit** in the destination cluster. This means the topic must have been created (or taken over) by Orbit and still be under Orbit's management. If a consumer group in the source cluster has committed offsets for topics that Orbit is not managing in the destination, those offsets will not be copied.

Additionally, if a consumer group in the destination cluster has active member clients, Orbit will stop copying offsets for that group entirely. Orbit also never allows a committed offset for a group to revert to a smaller value — offsets will only move forward.

See [What is the difference between "managed by Orbit" and matching a `topic_mappings` entry?](#what-is-the-difference-between-managed-by-orbit-and-matching-a-topic_mappings-entry) for more details on this distinction.

There is currently no way to filter which specific managed topics have their consumer group offsets synced — it is all-or-nothing via the `copy_offsets_enabled` flag.
