Orbit

Replicate and migrate Kafka clusters. Currently in beta.

Overview

Orbit is a feature of the WarpStream BYOC product that can automatically replicate data and metadata from any source Kafka-compatible cluster to a destination WarpStream cluster. Orbit is built directly into the WarpStream Agents, and doesn't require any additional deployments or binaries, it runs directly in the destination WarpStream cluster.

Orbit can perfectly mirror Kafka records (including headers and preserving offsets), topic configurations, cluster configurations, topic deletions, consumer group offsets, and more.

Records copied from source to destination are identical down to the offsets of the records. This, combined with the fact that Orbit can also copy consumer groups offsets, means that Orbit can be used to reliably migrate Kafka clients to the destination cluster without any duplicate consumption of data.

There are four primary use-cases for Orbit:

  1. Replication from a source Kafka cluster into WarpStream for migrations.

  2. Creating replicated copies of source Kafka clusters for disaster recovery.

  3. Creating replicated copies of source Kafka clusters for scalable tiered storage.

  4. Creating replicated copies of source Kafka clusters to provide additional read replicas, and isolate workloads. For example, analytical or batch consumers could read from the WarpStream replica cluster to avoid overloading the source Kafka cluster.

Orbit Configuration

Orbit is fully controllable from a single YAML file which can be edited through the WarpStream console or created programmatically through terraform.

Overview

# WarpStream Orbit YAML configuration file.

source_bootstrap_brokers:
    # These nodes should belong to the same source Kafka cluster.
    # Orbit does not (currently) support replicating from multiple
    # source Kafka clusters.
    - hostname: localhost # Kafka server hostname.
      port: 9092
    - hostname: example.kafkaserver.com
      port: 9092

# Optional, omit if SASL is not required to connect to the source
# cluster.
source_cluster_credentials:
    # Set in the Agents with an ORBIT_ prefix.
    sasl_username_env: SASL_USERNAME_ENV_VAR
    sasl_password_env: SASL_PASSWORD_ENV_VAR

topic_mappings:
    - source_regex: topic.* # Exact match, not substring.
      destination_prefix: test_ # Prefix added to destination topics.

cluster_config:
    # Don't copy cluster configs from source (topic auto topic creation
    # policy, default partition count, etc).
    copy_source_cluster_configuration: false

consumer_groups:
    # Add orbit_group_ prefix to copied consumer group names.
    destination_group_prefix: orbit_group_
    # Whether consumer groups and their corresponding offsets
    # should be copied or not.
    copy_offsets_enabled: true             

warpstream:
    # Used to control the rate at which Orbit will consume from
    # the source Kafka topic. Higher values will increase
    # throughput, but put more load on the source cluster.
    cluster_fetch_concurrency: 2

Here's a quick summary of the YAML file above:

  • Orbit is set up to connect to a Kafka cluster with 2 source brokers.

  • It will connect to the source brokers using SASL and will retrieve the username and password by reading the ORBIT_SASL_USERNAME_ENV_VAR and ORBIT_SASL_PASSWORD_ENV_VAR environment variables respectively in the Agent.

  • Topics which match the topic.* regex in the source will be mirrored to the destination. In the destination a test_ prefix will be added to the topic names.

  • Cluster configurations will not be copied over.

  • Consumer group names and offsets will be replicated into the destination cluster. The consumer group names will have a orbit_group_ prefix in the destination.

  • At most 2 concurrent fetches (globally, regardless of the number of Agents) will be executed in the WarpStream Agents at any given time.

Specify Source Brokers

Host address and port of the the source cluster Kafka brokers or Agents.

source_bootstrap_brokers:
    - hostname: localhost
      port: 9092
    - hostname: example.kafkaserver.com
      port: 9092
  • Source cluster brokers are listed under the source_bootstrap_brokers in the YAML.

  • Each broker is defined as a hostname and a port.

  • The brokers must be reachable from your WarpStream Agents.

Specify Credentials

Credentials used by the destination WarpStream agents to connect to the source cluster.

source_cluster_credentials:
    sasl_username_env: SASL_USERNAME_ENV_VAR
    sasl_password_env: SASL_PASSWORD_ENV_VAR
    use_tls: false
  • SASL credentials for the brokers are defined under the source_cluster_credentials section in the YAML.

  • Orbit currently only supports SASL plain. Contact us if you need support for additional SASL mechanisms.

  • Both the SASL_USERNAME_ENV_VAR and the SASL_PASSWORD_ENV_VAR fields refer to environment variables. The Agents will append a ORBIT_ prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as ORBIT_SASL_USERNAME_ENV_VAR and ORBIT_SASL_PASSWORD_ENV_VAR respectively.

  • Since the Agents runs in your VPC, WarpStream does not have access to your credentials.

  • To force the Agents to use TLS when connecting to your source clusters, use the use_tls flag in the source_cluster_credentials section.

Topic Mappings

Replicate Topics

Topics can be replicated by adding mappings under the topic_mappings section of the YAML.

topic_mappings:
    - source_regex: topic.*
      destination_prefix: test0_
    - source_regex: 1topic.*
      destination_prefix: test1_
  • To replicate a topic from the source cluster to the destination, add a topic mapping to the topic_mappings section with a source_regex which will match the source topic which should be replicated. For example in the above regex, topics in the source cluster with prefixes topic and 1topic would be replicated.

  • Note that topics created by Orbit can only receive writes from Orbit until the irreversible_disable_orbit_management field in the topic mapping is set to true. See the producer migration section for details on how to use this field.

Pause Topic Replication

topic_mappings:
    - source_regex: topic.*
      destination_prefix: test0_
    - source_regex: 1topic.*
      destination_prefix: test1_
  • To pause replication for a topic, remove the mapping which matches the topic from the YAML.

  • For example, if the source cluster has a topic topic2, then it will replicated in the destination as test0_topic2 due to the destination_prefix test0_.

  • To pause writes to the test0_topic2 topic in the destination, you can remove the mapping with destination_prefix test0_ and source_regex topic.*.

Topic Mapping Rules

Detailed rules about the topic mappings which indicates which topics mappings match which topics.

topic_mappings:
    - source_regex: topic.*
      destination_prefix: test0_
    - source_regex: 1topic.*
      destination_prefix: test1_
  • Each mapping consists of source_regex, destination_prefix, and irreversible_disable_orbit_management fields.

  • Orbit will create a new topic in the destination only if there exists a topic mapping with a source_regex which matches the source cluster topic.

  • To determine if a topic in the source cluster matches a topic mapping, it must match a source_regex in at least one topic mapping. For example, the topic topicA matches the first topic mapping.

  • The value of the destination_prefix field is appended as a prefix to the topic name for the topic which will be created in the destination cluster. Empty string is a valid value for destination_prefix if you want the topics to have the same name in the destination WarpStream cluster as they did in the source cluster.

  • To determine if a topic which has been created by Orbit in the destination cluster matches a topic matching, it must match the destination_prefix + source_regex in at least one of the topic mappings. For example, topicA is created as test0_topicA in the destination, and test0_topicA in the destination, matches the first topic mapping.

  • A topic in either the source or the destination cluster will match the mappings in the topic_mappings section in the order in which the topic mappings are listed, and only a single topic mapping will apply (the first one).

  • Orbit will keep the topic configurations in sync for a source/destination topic pair only if the destination topic matches a topic mapping.

  • Orbit will keep deleted topics in sync for a source/destination topic pair only if the destination topic matches a topic mapping.

  • Orbit will replicate records for a source/destination topic pair only if the destination topic matches a topic mapping.

  • The irreversible_disable_orbit_management will only be respected if a destination topic matches the topic mapping with the flag set to true. See more details on topic migration in the producer migration section where the meaning of this configuration option is explained.

Topic Configurations

Orbit will copy topic configurations from the source cluster for all of the topics which Orbit is managing in the destination cluster. Note that only the following configuration values are considered relevant to WarpStream and will be copied by Orbit:

  • cleanup.policy

  • message.timestamp.type

  • retention.ms

  • delete.retention.ms

Sync Consumer Groups

Orbit can sync offsets of consumer groups which exist in the source cluster to the destination cluster.

consumer_groups:
    copy_offsets_enabled: true
  • Set copy_offsets_enabled to true under the consumer_groups section and Orbit will start mirroring consumer groups from the source cluster to the destination.

Sync Cluster Configurations

cluster_config:
    copy_source_cluster_configuration: true
  • Orbit will copy cluster configurations from the source cluster to the destination if the copy_source_cluster_configuration flag under the cluster_config section is set to true.

  • Note that Orbit only copies cluster configurations which are relevant to WarpStream. These include:

    • num.partitions which is used to determine the default number of partitions a new topic should be created with. This is referring to new topics created by a Kafka client, and not Orbit.

    • auto.create.topics.enable which indicates if new topics should be created automatically when the client performs a metadata request for a topic which does not exist.

    • log.retention.ms log.retention.minutes log.retention.hours which indicate the default retention used by the topics.

    • offsets.retention.minutes which indicates the expiration duration of the offsets committed by a consumer group.

Orbit UI

You can edit the Orbit YAML, view the offset lag for topic partitions being mirrored, and view which topics Orbit is mirroring all from the WarpStream UI.

Create Orbit YAML

  1. Select your virtual cluster from the WarpStream console and navigate to the Orbit tab. You'll see a text editor which can be used to create and edit the Orbit YAML file.

  1. Edit the YAML, and hit Save. Note that the pipeline is paused by default. You can start it by hitting the toggle next to the word PAUSED.

  1. Once the pipeline is running, you'll see the topics which are mirrored by Orbit show up under the Orbit tab. In the image below, Orbit created a topic called test_topic with a prefix orbit_ as expected.

  1. To modify the YAML, hit the edit button, edit the YAML, and hit Save again. Orbit will create a new configuration for the pipeline. In the image below, I've set copy_offsets_enabled to true so that consumer group offsets will be copied from the source cluster to the destination. Note: You will have to hit the Deploy button on the new version before it will take effect. You can also deploy older versions to "rollback".

  1. You can view the lag between the source and destination cluster topic partitions from the Consumers tab. The orbit_source_cluster_offsets consumer group will include every topic and partition that is being mirrored by Orbit. The offset lag indicates how far behind Orbit is compared to the source cluster. Note: This group will always have a time lag of 0.

Migration

Orbit can be used to migrate existing Kafka compatible clusters to WarpStream.

Replication

First, follow the instructions above to configure WarpStream to replicate all of the topics and consumer groups from the source cluster. Monitor the lag of the orbit_source_cluster_offsets consumer group to wait for WarpStream to "catch up" on all the source Kafka topics.

Consumers

The following migration flow will ensure that your Kafka consumer clients can be migrated to WarpStream without duplicate processing of records. The Orbit YAML must have the following config.

consumer_groups:
    copy_offsets_enabled: true         
  1. Orbit will periodically sync committed offsets for consumer groups to WarpStream.

  2. Let's say you want to migrate some consumer group X.

  3. First, shut down the consumers belonging to this group in the source cluster.

  4. Wait 1 minute for Orbit to copy the consumer group offsets for group X to the destination cluster. You can use the warpstream_consumer_group_max_offset metric emitted from the agents to view the offsets copied by Orbit to the destination. See the monitoring page for more details on this metric. You can also see the currently committed offsets in the consumers tab in the WarpStream UI.

  5. Restart consumers but point them to the destination WarpStream cluster instead by changing their bootstrap URL. The consumers will start reading from the offsets which were last committed in the source cluster, picking up where they left off.

Producers

The following migration flow will ensure that your Kafka producer clients can be cut over to WarpStream seamlessly. Using the irreversible_disable_orbit_management field in a topic mapping will disable Orbit management of the topic. The following example shows how and when to use the field.

  1. Let's say you have the following topic mapping in the Orbit YAML, and you have decided that you want to move the topic topic0 matched by this mapping from the source to the destination. To make the example more complicated, let's assume that a topic1 also exists in the source cluster which is also being mirrored by Orbit.

topic_mappings:
    - source_regex: topic.*
      destination_prefix: test0_
  1. First shut down the producer clients producing to topic0 in the source.

  2. Wait for the offset lag between the source topic topic0 and destination topic test0_topic0 to become 0. You can view the lag for the topic from the orbit_source_cluster_offsets group in the Consumer view in the Warpstream console. The lag for all of the partitions of the topic are also emitted as the warpstream_consumer_group_lag metric from the Agents. See the monitoring page for more details on this metric. You can also see the currently committed offsets in the consumers tab in the WarpStream UI.

  3. Create a new topic mapping to migrate the test0_topic topic. Note the irreversible_disable_orbit_management field. This field indicates that Orbit should stop managing the topic which will allow Kafka Producer clients to write to the topic. This decision is irreversible.

topic_mappings:
    - source_regex: topic0
      destination_prefix: test0_
      irreversible_disable_orbit_management: true
    - source_regex: topic.*
      destination_prefix: test0_
  1. Since topic mappings match topics in order, the test0_topic0 will now accept writes from regular Kafka clients, and Orbit will no longer copy records from source for this topic. Note that test0_topic1 is unaffected because it does not match the topic mapping.

  2. Restart your producer clients by pointing them to the destination WarpStream cluster. Since topic0 is known as test0_topic0 in the destination due to the prefix, you'll have to write to test0_topic0 instead.

Observability

You can monitor Orbit using the metrics built into the WarpStream Agents. See our monitoring documents for more details.

Orbit will also emit some additional metrics:

  1. You can use the warpstream_agent_kafka_produce_with_offset_uncompressed_bytes_counter metric to view the data successfully written by Orbit to the destination WarpStream cluster.

  2. You can use the warpstream_consumer_group_lag and filter for the group orbit_source_cluster_offsets and the topics you care about to determine the lag between your topics in the source cluster and WarpStream. View the monitoring page for more details on this metric.

  3. You can use the warpstream_consumer_group_max_offset metric emitted from the agents to view the offsets copied by Orbit to the consumer group in the destination. See the monitoring page for more details on this metric.

Tuning Orbit

Orbit is tuneable using the following section which can be added to the Orbit YAML.

warpstream:
    cluster_fetch_concurrency: 10
    fetch_config:
        fetch_max_wait_ms: 10000
        fetch_min_bytes: 1
        fetch_max_bytes: 104857600
        fetch_max_partition_bytes: 52428800
  • The above YAML controls how Orbit executes fetches which grab data from the source cluster.

  • The key field is the cluster_fetch_concurrency under the warpstream section. It can be used to control the approximate concurrency of fetches which will be executed against the source cluster and is the field you'd want to experiment with to find an appropriate setting for your workload.

  • While it is recommended to use the default settings, you can also modify the fields under the fetch_config section to further tune Orbit. The fetch_max_wait_ms, fetch_min_bytes, fetch_max_bytes, fetch_max_partition_bytes are equivalent to the fetch.max.wait.ms, fetch.min.bytes, fetch.max.bytes, max.partition.fetch.bytes Kafka settings respectively.

Unclean Leader Election

Note that it is expected that the topics replicated by Orbit have unclean.leader.election.enable set to false. If this configuration is set to true there can be data loss in the source cluster, and Orbit won't retroactively delete data in the destination WarpStream cluster which has been replicated using Orbit, but is lost in the source cluster.

Last updated