Managed Data Pipelines

This page explains how to use the WarpStream Managed Data Pipelines functionality.

Running managed data pipelines can significantly increase the load on the WarpStream Agents. This can disrupt production workloads if the same set of Agents are also used to serve Produce and Fetch requests.

For large or high volume data pipelines, consider using WarpStream's Agent Roles feature to isolate the managed data pipelines onto dedicated Agent infrastructure. This can be accomplished by creating a separate deployment of the Agents with the only the "pipelines" role enabled, and then ensuring that the Agent deployment handling your Produce / Fetch requests does not have the "pipelines" role enabled.

Overview

WarpStream managed data pipelines combine the power of WarpStream's control plane with Bento, a stream processing tool that makes "fancy stream processing operationally mundane". Bento is a lightweight stream processing framework that offers much of the functionality of Kafka Connect, as well as additional stream processing functionality like single message transforms, aggregations, multiplexing, enrichments, and more. It also has native support for WebAssembly (WASM) for more advanced processing.

Like WarpStream, Bento is written in Go, so we embedded it natively into the WarpStream Agents to make it possible to perform basic ETL and stream processing tasks with a single, stateless binary.

WarpStream managed data pipelines take Bento's capabilities one step further: by embedding Bento inside the WarpStream Agents, data pipelines can be written, edited, and fully controlled from WarpStream's managed control plane. This makes it possible to perform basic ETL and stream processing tasks with a single, stateless binary, and with no additional infrastructure beyond the WarpStream Agents.

This approach brings all the benefits of WarpStream's BYOC Kafka replacement to Bento: managed data pipelines run in your cloud account, on your VMs, and process data in your buckets. WarpStream has no access to any of the data processed by your managed pipelines. Instead, WarpStream's control plane enhances Bento by adding:

  1. A helpful UI for creating and editing the pipelines.

  2. The ability to pause and resume pipelines dynamically.

  3. A full version control system so pipeline configuration is versioned and can easily be rolled forward / backwards.

  4. Much more!

Managed data pipelines are (currently) only supported in WarpStream's BYOC product.

Getting Started

Getting started with WarpStream managed data pipelines is easy. The WarpStream Agents embed Bento natively as a library, but by default managed data pipelines are disabled. To enable them, add the -enableManagedPipelines flag to your Agent binary, or set the WARPSTREAM_ENABLE_MANAGED_PIPELINES=true environment variable.

For example:

warpstream agent -virtualClusterID XXXXXXXX -apiKey XXXXXXXX -bucketURL s3://my-warpstream-bucket -enableManagedPipelines

However, before deploying your Bento configuration to production, let's get started with a small stream processing task using an ephemeral WarpStream Playground cluster. Start one now by running the command below (Note: the playground will set -enableManagedPipelines by default):

warpstream playground

The command will print a URL to a temporary WarpStream Playground account. Open the URL in your browser, then navigate to the "Pipelines" view for your cluster:

Click "Create Pipeline" and give provide a name to create your first managed data pipeline:

After creating your pipeline, you'll be dropped into the WarpStream pipeline editor UI:

From here, you can edit the configuration of your pipeline, pause it, resume it, and roll your configuration forwards and backwards.

The first thing we need to do is save an initial configuration for the pipeline. The easiest way to learn how to do that is with the official Bento documentation. For now though, copy and paste the sample configuration below:

input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test

This configuration instructs Bento to generate some semi-random sample data, and then write that data to a WarpStream topic called: "bento_test". Note that whenever we want to use the current WarpStream cluster as a Kafka source or sink, we use a kafka_franz_warpstream block instead of a generic kafka_franz block. This instructs the data pipeline to transparently take care of configuring the client for performance by setting an appropriate client ID to leverage WarpStream's zone-aware service discover system, automatically configuring the client with appropriate SASL credentials if the cluster has authentication or ACLs enabled, and much more.

Once you've copied and pasted the configuration above, click "Save" in the pipeline editor.

The configuration is now saved and "deployed", but it's not running yet. Click the toggle button next to where it says "PAUSED" to start running the pipeline.

Once the pipeline is running, you can monitor its progress by clicking on the "Topics" tab.

Now lets try modifying a running pipeline! Navigate back to the pipelines view, click on the test pipeline, and then click "Edit" and change the output topic to "bento_test_2" :

Now click "Save".

Note that even though we edited the pipeline configuration, the old version is still running, that's why there is a green target symbol next to "Version 0". This indicates that even though "Version 1" is the latest version, the version that is currently deployed is "Version 0". This keeps the process of editing configuration separate from the process of deploying configuration.

To deploy the new version of the pipeline, select "Version 1" from the menu on the left and then click "Deploy".

Navigate back to the topics tab, and you should see the new "bento_test_2" topic now!

That's it! If you have any questions or requests, hop in our community Slack!

WarpStream Specific Configuration

kafka_franz_warpstream blocks

Anywhere in a Bento configuration file that a kafka_franz block can exist, it can be replaced with a kafka_franz_warpstream block which will be transparently transformed by WarpStream to refer to the WarpStream cluster that is running the managed pipeline.

For example, the following configuration:

input:
    kafka_franz:
        seed_brokers: ["localhost:9092"]
        topics: ["test_topic"]

    processors:
        - mapping: "root = content().capitalize()"

output:
    kafka_franz:
        seed_brokers: ["localhost:9092"]
        topic: "test_topic_capitalized"

Could instead be expressed as:

input:
    kafka_franz_warpstream:
        topics: ["test_topic"]

    processors:
        - mapping: "root = content().capitalize()"

output:
    kafka_franz_warpstream:
        topic: "test_topic_capitalized"

While that may seem like a minor difference, the kafka_franz_warpstream block does more than just insert an appropriate value for seed_broker:

  1. It automatically enables appropriate batching to improve performance with WarpStream.

  2. It configures a zone-aware client ID that leverages WarpStream's zone-aware discovery system so that your data pipelines which write to and read from WarpStream will not incur any inter-az networking costs.

  3. It detects which port the Agent is serving Kafka on, and configures that as the seed_broker in-case a port other than 9092 is being used.

  4. It automatically detects if authentication / authorization is required, and if so, configures the client with appropriate SASL credentials so your pipelines will work even when authentication is required and ACLs are enabled.

Remember, these blocks works anywhere that a kafka_franz block could be used. For example, you can write more advanced configurations like this, and it will still work:

input:
    generate:
        count: 1000
        interval: 1s
        mapping: |
            root = if random_int() % 2 == 0 {
              {
                "type": "foo",
                "foo": "xyz"
              }
            } else {
              {
                "type": "bar",
                "bar": "abc"
              }
            }
output:
    broker:
        outputs:
            - file:
                codec: lines
                path: /tmp/data.txt
              type: file
            - kafka_franz_warpstream:
                topic: bento_test
              type: kafka_franz
        pattern: fan_out

Whenever communicating with the WarpStream cluster via the Kafka API, we recommend using these blocks instead of standard kafka_franz blocks.

WarpStream Block

The warpstream block supports addition WarpStream specific functionality not available in Bento.

Scheduling

The scheduling block supports running a pipeline at regular intervals. This is particularly useful for Bento pipelines where the input source has a natural termination point (as opposed to a Kafka input where will poll for new records forever). For example, when using GCP BigQuery as an input, the pipeline will run until all the records returned by the query have been processed.

input:
  generate:
    count: 1
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  scheduling:
    run_every: 1s

Without the run_every configuration, this pipeline would run once and generate a single event and then terminate, but with the run_every block it will generate one record every second forever. Note that the pipeline will still run on every Agent, so in practice it will be one record/s * NUM_RUNNING_AGENTS. See the Concurrency Management section below for more details on how to control concurrency.

Concurrency Management

Increasing pipeline concurrency will increase the load on the WarpStream Agents. For large or high volume data pipelines, consider using WarpStream's Agent Roles or Agent Groups features to isolate the data pipelines onto dedicated Agent deployments.

In addition to the standard Bento configuration, WarpStream data pipelines also support a special top level warpstream block. Today there are two additional configurations that this enables. The first is cluster_concurrency_target

input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  cluster_concurrency_target: 1

cluster_concurrency_target controls the "target concurrency" for this pipeline. For example, if its set to 1, then that means that ideally you would prefer that only 1 "instance" of this pipeline run in the cluster at any given moment. However, for balancing and simplicity reasons, this value will always round to a multiple of the number of currently running Agents. The table below demonstrates how this behavior plays out in practice.

Cluster Concurrency Target
# of Agents
Actual Concurrency

1

3

3

2

3

3

3

3

3

4

3

6

Alternatively, you can express the concurrency more directly in terms of how many instances of the pipeline each Agent should run using the agent_concurrency_target config. For example,

input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  agent_concurrency_target: 1
Agent Concurrency Target
# of Agents
Actual Concurrency

1

3

3

2

3

6

3

3

9

Pipeline Groups

Pipeline groups are similar to Agent Groups, but for managed data pipelines. They can be used to isolate different managed pipelines onto different groups of Agents. For example, if you had a mixture of small and very large managed pipelines, you can isolate the larger pipelines onto a dedicated group of Agents so that they cant interfere with the smaller pipelines.

Agents with managed pipelines enabled (running the pipelines role) will run any pipeline that is assigned to the same group as them. If no pipeline group is specified, the Agents will run any pipeline that also doesn't have an assigned group.

Agent Pipeline Group Name
Pipeline Group Name
Will Run?

N/A

N/A

Yes

small

N/A

No

large

N/A

No

N/A

small

No

small

small

Yes

large

small

No

N/A

large

No

small

large

No

large

large

Yes

Configuring the Agents

The pipeline group name can be any valid string. Configuring the pipeline group name for an Agent deployment is done by setting the managedPipelinesGroupName flag or WARPSTREAM_MANAGED_PIPELINES_GROUP_NAME environment variable.

Configuring the Pipeline

input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  agent_concurrency_target: 1
  pipeline_group: $PIPELINE_GROUP_NAME

Last updated