Managed Data Pipelines
This page explains how to use the WarpStream Managed Data Pipelines functionality.
Running managed data pipelines can significantly increase the load on the WarpStream Agents. This can disrupt production workloads if the same set of Agents are also used to serve Produce and Fetch requests.
For large or high volume data pipelines, consider using WarpStream's Agent Roles feature to isolate the managed data pipelines onto dedicated Agent infrastructure. This can be accomplished by creating a separate deployment of the Agents with the only the "pipelines" role enabled, and then ensuring that the Agent deployment handling your Produce / Fetch requests does not have the "pipelines" role enabled.
Overview
WarpStream managed data pipelines combine the power of WarpStream's control plane with Bento, a stream processing tool that makes "fancy stream processing operationally mundane". Bento is a lightweight stream processing framework that offers much of the functionality of Kafka Connect, as well as additional stream processing functionality like single message transforms, aggregations, multiplexing, enrichments, and more. It also has native support for WebAssembly (WASM) for more advanced processing.
Like WarpStream, Bento is written in Go, so we embedded it natively into the WarpStream Agents to make it possible to perform basic ETL and stream processing tasks with a single, stateless binary.
WarpStream managed data pipelines take Bento's capabilities one step further: by embedding Bento inside the WarpStream Agents, data pipelines can be written, edited, and fully controlled from WarpStream's managed control plane. This makes it possible to perform basic ETL and stream processing tasks with a single, stateless binary, and with no additional infrastructure beyond the WarpStream Agents.
This approach brings all the benefits of WarpStream's BYOC Kafka replacement to Bento: managed data pipelines run in your cloud account, on your VMs, and process data in your buckets. WarpStream has no access to any of the data processed by your managed pipelines. Instead, WarpStream's control plane enhances Bento by adding:
A helpful UI for creating and editing the pipelines.
The ability to pause and resume pipelines dynamically.
A full version control system so pipeline configuration is versioned and can easily be rolled forward / backwards.
Much more!
Managed data pipelines are (currently) only supported in WarpStream's BYOC product.
Getting Started
Getting started with WarpStream managed data pipelines is easy. The WarpStream Agents embed Bento natively as a library, but by default managed data pipelines are disabled. To enable them, add the -enableManagedPipelines
flag to your Agent binary, or set the WARPSTREAM_ENABLE_MANAGED_PIPELINES=true
environment variable.
For example:
However, before deploying your Bento configuration to production, let's get started with a small stream processing task using an ephemeral WarpStream Playground cluster. Start one now by running the command below (Note: the playground will set -enableManagedPipelines
by default):
The command will print a URL to a temporary WarpStream Playground account. Open the URL in your browser, then navigate to the "Pipelines" view for your cluster:
Click "Create Pipeline" and give provide a name to create your first managed data pipeline:
After creating your pipeline, you'll be dropped into the WarpStream pipeline editor UI:
From here, you can edit the configuration of your pipeline, pause it, resume it, and roll your configuration forwards and backwards.
The first thing we need to do is save an initial configuration for the pipeline. The easiest way to learn how to do that is with the official Bento documentation. For now though, copy and paste the sample configuration below:
This configuration instructs Bento to generate some semi-random sample data, and then write that data to a WarpStream topic called: "bento_test". Note that whenever we want to use the current WarpStream cluster as a Kafka source or sink, we use a kafka_franz_warpstream
block instead of a generic kafka_franz
block. This instructs the data pipeline to transparently take care of configuring the client for performance by setting an appropriate client ID to leverage WarpStream's zone-aware service discover system, automatically configuring the client with appropriate SASL credentials if the cluster has authentication or ACLs enabled, and much more.
Once you've copied and pasted the configuration above, click "Save" in the pipeline editor.
The configuration is now saved and "deployed", but it's not running yet. Click the toggle button next to where it says "PAUSED" to start running the pipeline.
Once the pipeline is running, you can monitor its progress by clicking on the "Topics" tab.
Now lets try modifying a running pipeline! Navigate back to the pipelines view, click on the test pipeline, and then click "Edit" and change the output topic to "bento_test_2" :
Now click "Save".
Note that even though we edited the pipeline configuration, the old version is still running, that's why there is a green target symbol next to "Version 0". This indicates that even though "Version 1" is the latest version, the version that is currently deployed is "Version 0". This keeps the process of editing configuration separate from the process of deploying configuration.
To deploy the new version of the pipeline, select "Version 1" from the menu on the left and then click "Deploy".
Navigate back to the topics tab, and you should see the new "bento_test_2" topic now!
That's it! If you have any questions or requests, hop in our community Slack!
WarpStream Specific Configuration
kafka_franz_warpstream blocks
Anywhere in a Bento configuration file that a kafka_franz
block can exist, it can be replaced with a kafka_franz_warpstream
block which will be transparently transformed by WarpStream to refer to the WarpStream cluster that is running the managed pipeline.
For example, the following configuration:
Could instead be expressed as:
While that may seem like a minor difference, the kafka_franz_warpstream
block does more than just insert an appropriate value for seed_broker
:
It automatically enables appropriate batching to improve performance with WarpStream.
It configures a zone-aware client ID that leverages WarpStream's zone-aware discovery system so that your data pipelines which write to and read from WarpStream will not incur any inter-az networking costs.
It detects which port the Agent is serving Kafka on, and configures that as the seed_broker in-case a port other than
9092
is being used.It automatically detects if authentication / authorization is required, and if so, configures the client with appropriate SASL credentials so your pipelines will work even when authentication is required and ACLs are enabled.
Remember, these blocks works anywhere that a kafka_franz
block could be used. For example, you can write more advanced configurations like this, and it will still work:
Whenever communicating with the WarpStream cluster via the Kafka API, we recommend using these blocks instead of standard kafka_franz
blocks.
WarpStream Block
The warpstream
block supports addition WarpStream specific functionality not available in Bento.
Scheduling
The scheduling block supports running a pipeline at regular intervals. This is particularly useful for Bento pipelines where the input source has a natural termination point (as opposed to a Kafka input where will poll for new records forever). For example, when using GCP BigQuery as an input, the pipeline will run until all the records returned by the query have been processed.
Without the run_every
configuration, this pipeline would run once and generate a single event and then terminate, but with the run_every
block it will generate one record every second forever. Note that the pipeline will still run on every Agent, so in practice it will be one record/s * NUM_RUNNING_AGENTS. See the Concurrency Management section below for more details on how to control concurrency.
Concurrency Management
Increasing pipeline concurrency will increase the load on the WarpStream Agents. For large or high volume data pipelines, consider using WarpStream's Agent Roles or Agent Groups features to isolate the data pipelines onto dedicated Agent deployments.
In addition to the standard Bento configuration, WarpStream data pipelines also support a special top level warpstream
block. Today there are two additional configurations that this enables. The first is cluster_concurrency_target
cluster_concurrency_target
controls the "target concurrency" for this pipeline. For example, if its set to 1, then that means that ideally you would prefer that only 1 "instance" of this pipeline run in the cluster at any given moment. However, for balancing and simplicity reasons, this value will always round to a multiple of the number of currently running Agents. The table below demonstrates how this behavior plays out in practice.
Cluster Concurrency Target | # of Agents | Actual Concurrency |
---|---|---|
1 | 3 | 3 |
2 | 3 | 3 |
3 | 3 | 3 |
4 | 3 | 6 |
Alternatively, you can express the concurrency more directly in terms of how many instances of the pipeline each Agent should run using the agent_concurrency_target
config. For example,
Agent Concurrency Target | # of Agents | Actual Concurrency |
---|---|---|
1 | 3 | 3 |
2 | 3 | 6 |
3 | 3 | 9 |
Pipeline Groups
Pipeline groups are similar to Agent Groups, but for managed data pipelines. They can be used to isolate different managed pipelines onto different groups of Agents. For example, if you had a mixture of small and very large managed pipelines, you can isolate the larger pipelines onto a dedicated group of Agents so that they cant interfere with the smaller pipelines.
Agents with managed pipelines enabled (running the pipelines
role) will run any pipeline that is assigned to the same group as them. If no pipeline group is specified, the Agents will run any pipeline that also doesn't have an assigned group.
Agent Pipeline Group Name | Pipeline Group Name | Will Run? |
---|---|---|
|
| Yes |
|
| No |
|
| No |
|
| No |
|
| Yes |
|
| No |
|
| No |
|
| No |
|
| Yes |
Configuring the Agents
The pipeline group name can be any valid string. Configuring the pipeline group name for an Agent deployment is done by setting the managedPipelinesGroupName
flag or WARPSTREAM_MANAGED_PIPELINES_GROUP_NAME
environment variable.
Configuring the Pipeline
Last updated