Managed Data Pipelines (beta)

This page explains how to use the WarpStream Managed Data Pipelines functionality.

Running data pipelines will increase the load on the WarpStream Agents. For large or high volume data pipelines, consider using WarpStream's Agent Roles or Agent Groups features to isolate the pipelines onto dedicated Agent deployments.

Overview

The WarpStream Benthos integration is currently in beta. We welcome your feedback in our Slack channel.

WarpStream managed data pipelines combine the power of WarpStream's control plane with Benthos, a stream processing tool that makes "fancy stream processing operationally mundane". Benthos is a lightweight stream processing framework that offers much of the functionality of Kafka Connect, as well as additional stream processing functionality like single message transforms, aggregations, multiplexing, enrichments, and more. It also has native support for WebAssembly (WASM) for more advanced processing.

Like WarpStream, Benthos is written in Go, so we embedded it natively into the WarpStream Agents to make it possible to perform basic ETL and stream processing tasks with a single, stateless binary.

WarpStream managed data pipelines take Benthos' capabilities one step further: by embedding Benthos inside the WarpStream Agents, data pipelines can be written, edited, and fully controlled from WarpStream's managed control plane. This makes it possible to perform basic ETL and stream processing tasks with a single, stateless binary, and with no additional infrastructure beyond the WarpStream Agents.

This approach brings all the benefits of WarpStream's BYOC Kafka replacement to Benthos: managed data pipelines run in your cloud account, on your VMs, and process data in your buckets. WarpStream has no access to any of the data processed by your managed pipelines. Instead, WarpStream's control plane enhances Benthos by adding:

  1. A helpful UI for creating and editing the pipelines.

  2. The ability to pause and resume pipelines dynamically.

  3. A full version control system so pipeline configuration is versioned and can easily be rolled forward / backwards.

  4. Much more!

Managed data pipelines are (currently) only supported in WarpStream's BYOC product.

Getting Started

Getting started with WarpStream managed data pipelines is easy. The WarpStream Agents embed Benthos natively as a library, but by default managed data pipelines are disabled. To enable them, add the -enableManagedPipelines flag to your Agent binary, or set the WARPSTREAM_ENABLE_MANAGED_PIPELINES=true environment variable.

For example:

warpstream agent -virtualClusterID XXXXXXXX -apiKey XXXXXXXX -bucketURL s3://my-warpstream-bucket -enableManagedPipelines

However, before deploying your Benthos configuration to production, let's get started with a small stream processing task using an ephemeral WarpStream Playground cluster. Start one now by running the command below (Note: the playground will set -enableManagedPipelines by default):

warpstream playground

The command will print a URL to a temporary WarpStream Playground account. Open the URL in your browser, then navigate to the "Pipelines" view for your cluster:

Click "Create Pipeline" and give provide a name to create your first managed data pipeline:

After creating your pipeline, you'll be dropped into the WarpStream pipeline editor UI:

From here, you can edit the configuration of your pipeline, pause it, resume it, and roll your configuration forwards and backwards.

The first thing we need to do is save an initial configuration for the pipeline. The easiest way to learn how to do that is with the official Benthos documentation. For now though, copy and paste the sample configuration below:

input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: benthos_test

This configuration instructs Benthos to generate some semi-random sample data, and then write that data to a WarpStream topic called: "benthos_test". Note that whenever we want to use the current WarpStream cluster as a Kafka source or sink, we use a kafka_franz_warpstream block instead of a generic kafka_franz block. This instructs the data pipeline to transparently take care of configuring the client for performance by setting an appropriate client ID to leverage WarpStream's zone-aware service discover system, automatically configuring the client with appropriate SASL credentials if the cluster has authentication or ACLs enabled, and much more.

Once you've copied and pasted the configuration above, click "Save" in the pipeline editor.

The configuration is now saved and "deployed", but it's not running yet. Click the toggle button next to where it says "PAUSED" to start running the pipeline.

Once the pipeline is running, you can monitor its progress by clicking on the "Topics" tab.

Now lets try modifying a running pipeline! Navigate back to the pipelines view, click on the test pipeline, and then click "Edit" and change the output topic to "benthos_test_2" :

Now click "Save".

Note that even though we edited the pipeline configuration, the old version is still running, that's why there is a green target symbol next to "Version 0". This indicates that even though "Version 1" is the latest version, the version that is currently deployed is "Version 0". This keeps the process of editing configuration separate from the process of deploying configuration.

To deploy the new version of the pipeline, select "Version 1" from the menu on the left and then click "Deploy".

Navigate back to the topics tab, and you should see the new "benthos_test_2" topic now!

That's it! If you have any questions or requests, hop in our community Slack!

WarpStream Specific Configuration

kafka_franz_warpstream blocks

Anywhere in a Benthos configuration file that a kafka_franz block can exist, it can be replaced with a kafka_franz_warpstream block which will be transparently transformed by WarpStream to refer to the WarpStream cluster that is running the managed pipeline.

For example, the following configuration:

input:
    kafka_franz:
        seed_brokers: ["localhost:9092"]
        topics: ["test_topic"]

    processors:
        - mapping: "root = content().capitalize()"

output:
    kafka_franz:
        seed_brokers: ["localhost:9092"]
        topic: "test_topic_capitalized"

Could instead be expressed as:

input:
    kafka_franz_warpstream:
        topics: ["test_topic"]

    processors:
        - mapping: "root = content().capitalize()"

output:
    kafka_franz_warpstream:
        topic: "test_topic_capitalized"

While that may seem like a minor difference, the kafka_franz_warpstream block does more than just insert an appropriate value for seed_broker:

  1. It automatically enables appropriate batching to improve performance with WarpStream.

  2. It configures a zone-aware client ID that leverages WarpStream's zone-aware discovery system so that your data pipelines which write to and read from WarpStream will not incur any inter-az networking costs.

  3. It detects which port the Agent is serving Kafka on, and configures that as the seed_broker in-case a port other than 9092 is being used.

  4. It automatically detects if authentication / authorization is required, and if so, configures the client with appropriate SASL credentials so your pipelines will work even when authentication is required and ACLs are enabled.

Remember, these blocks works anywhere that a kafka_franz block could be used. For example, you can write more advanced configurations like this, and it will still work:

input:
    generate:
        count: 1000
        interval: 1s
        mapping: |
            root = if random_int() % 2 == 0 {
              {
                "type": "foo",
                "foo": "xyz"
              }
            } else {
              {
                "type": "bar",
                "bar": "abc"
              }
            }
output:
    broker:
        outputs:
            - file:
                codec: lines
                path: /tmp/data.txt
              type: file
            - kafka_franz_warpstream:
                topic: benthos_test
              type: kafka_franz
        pattern: fan_out

Whenever communicating with the WarpStream cluster via the Kafka API, we recommend using these blocks instead of standard kafka_franz blocks.

Concurrency Management

Increasing pipeline concurrency will increase the load on the WarpStream Agents. For large or high volume data pipelines, consider using WarpStream's Agent Roles or Agent Groups features to isolate the data pipelines onto dedicated Agent deployments.

In addition to the standard Benthos configuration, WarpStream data pipelines also support a special top level warpstream block. Today there are two additional configurations that this enables. The first is cluster_concurrency_target

input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: benthos_test
warpstream:
  cluster_concurrency_target: 1

cluster_concurrency_target controls the "target concurrency" for this pipeline. For example, if its set to 1, then that means that ideally you would prefer that only 1 "instance" of this pipeline run in the cluster at any given moment. However, for balancing and simplicity reasons, this value will always round to a multiple of the number of currently running Agents. The table below demonstrates how this behavior plays out in practice.

Cluster Concurrency Target# of AgentsActual Concurrency

1

3

3

2

3

3

3

3

3

4

3

6

Alternatively, you can express the concurrency more directly in terms of how many instances of the pipeline each Agent should run using the agent_concurrency_target config. For example,

input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: benthos_test
warpstream:
  agent_concurrency_target: 1
Agent Concurrency Target# of AgentsActual Concurrency

1

3

3

2

3

6

3

3

9

Last updated

Logo

Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation. Kinesis is a trademark of Amazon Web Services.