# Managed Data Pipelines

{% hint style="info" %}
Running managed data pipelines can significantly increase the load on the WarpStream Agents. This can disrupt production workloads if the same set of Agents are also used to serve Produce and Fetch requests.

For large or high volume data pipelines, consider using WarpStream's [Agent Roles](https://docs.warpstream.com/warpstream/kafka/advanced-agent-deployment-options/splitting-agent-roles) feature to isolate the managed data pipelines onto dedicated Agent infrastructure. This can be accomplished by creating a separate deployment of the Agents with the only the "pipelines" role enabled, and then ensuring that the Agent deployment handling your Produce / Fetch requests does not have the "pipelines" role enabled.
{% endhint %}

## Overview

WarpStream managed data pipelines combine the power of WarpStream's control plane with [Bento](https://warpstreamlabs.github.io/bento/), a stream processing tool that makes "fancy stream processing operationally mundane". Bento is a lightweight stream processing framework that offers much of the functionality of Kafka Connect, as well as additional stream processing functionality like single message transforms, aggregations, multiplexing, enrichments, and more. It also has native support for WebAssembly (WASM) for more advanced processing.

Like WarpStream, Bento is written in Go, so we embedded it natively into the WarpStream Agents to make it possible to perform basic ETL and stream processing tasks with a single, stateless binary.

WarpStream managed data pipelines take Bento's capabilities one step further: by embedding Bento *inside* the WarpStream Agents, data pipelines can be written, edited, and fully controlled from WarpStream's managed control plane. This makes it possible to perform basic ETL and stream processing tasks with a single, stateless binary, and with no additional infrastructure beyond the WarpStream Agents.

This approach brings all the benefits of WarpStream's BYOC Kafka replacement to Bento: managed data pipelines run in *your* cloud account, on *your* VMs, and process data in *your* buckets. WarpStream has no access to any of the data processed by your managed pipelines. Instead, WarpStream's control plane enhances Bento by adding:

1. A helpful UI for creating and editing the pipelines.
2. The ability to pause and resume pipelines dynamically.
3. A full version control system so pipeline configuration is versioned and can easily be rolled forward / backwards.
4. Much more!

Managed data pipelines are (currently) only supported in WarpStream's BYOC product.

We highly recommend watching the overview video below, which provides a comprehensive overview of Managed Data Pipeline features.

{% embed url="<https://player.vimeo.com/video/1058321091>" %}

## Getting Started

Getting started with WarpStream managed data pipelines is easy. The WarpStream Agents embed Bento natively as a library, but by default managed data pipelines are disabled. To enable them, add the `-enableManagedPipelines` flag to your Agent binary, or set the `WARPSTREAM_ENABLE_MANAGED_PIPELINES=true` environment variable.

For example:

{% code overflow="wrap" %}

```bash
warpstream agent -virtualClusterID XXXXXXXX -apiKey XXXXXXXX -bucketURL s3://my-warpstream-bucket -enableManagedPipelines
```

{% endcode %}

However, before deploying your Bento configuration to production, let's get started with a small stream processing task using an ephemeral WarpStream Playground cluster. Start one now by running the command below (Note: the *playground* will set `-enableManagedPipelines` by default):

```
warpstream playground
```

The command will print a URL to a temporary WarpStream Playground account. Open the URL in your browser, then navigate to the "Pipelines" view for your cluster:

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-d49614966ea70e3c1c39176e31f8d194e08eb93e%2FScreenshot%202024-05-12%20at%204.17.41%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

Click "Create Pipeline" and give provide a name to create your first managed data pipeline:

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-b26bfb4604b7c0b77ce087fafce5ecddc221e54d%2FScreenshot%202024-05-12%20at%204.18.40%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

After creating your pipeline, you'll be dropped into the WarpStream pipeline editor UI:

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-fbf32be66fad6c0930282e178a3068d683955e91%2FScreenshot%202024-05-12%20at%204.19.32%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

From here, you can edit the configuration of your pipeline, pause it, resume it, and roll your configuration forwards and backwards.

The first thing we need to do is save an initial configuration for the pipeline. The easiest way to learn how to do that is with the [official Bento documentation](https://warpstreamlabs.github.io/bento/). For now though, copy and paste the sample configuration below:

```yaml
input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
```

This configuration instructs Bento to generate some semi-random sample data, and then write that data to a WarpStream topic called: "bento\_test". Note that whenever we want to use the current WarpStream cluster as a Kafka source or sink, we use a `kafka_franz_warpstream` block instead of a generic `kafka_franz` block. This instructs the data pipeline to transparently take care of configuring the client for performance by setting an appropriate client ID to leverage WarpStream's zone-aware service discover system, automatically configuring the client with appropriate SASL credentials if the cluster has authentication or ACLs enabled, and much more.

Once you've copied and pasted the configuration above, click "Save" in the pipeline editor.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-e17479d6dd0b76efe5b4c8b348a912f862f5ab9f%2FScreenshot%202024-05-12%20at%204.27.08%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

The configuration is now saved and "deployed", but it's not running yet. Click the toggle button next to where it says "PAUSED" to start running the pipeline.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-2db8f03d03401591c5d9c3c8d0093aa42706173d%2FScreenshot%202024-05-12%20at%204.29.48%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

Once the pipeline is running, you can monitor its progress by clicking on the "Topics" tab.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-ea407071921578d76bff1e2004cbf0702c3a0772%2FScreenshot%202024-05-12%20at%204.39.04%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

Now lets try modifying a running pipeline! Navigate back to the pipelines view, click on the test pipeline, and then click "Edit" and change the output topic to "bento\_test\_2" :

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-b4902857aa2a0f64e6c91be8d6cb8815c3d567b8%2FScreenshot%202024-05-12%20at%204.40.22%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

Now click "Save".

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-66293f627aad03ab3cdf10bd7ea1019ed48dc5e6%2FScreenshot%202024-05-12%20at%204.41.03%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

Note that even though we edited the pipeline configuration, the old version is still running, that's why there is a green target symbol next to "Version 0". This indicates that even though "Version 1" is the latest version, the version that is *currently deployed* is "Version 0". This keeps the process of editing configuration separate from the process of deploying configuration.

To deploy the new version of the pipeline, select "Version 1" from the menu on the left and then click "Deploy".

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-9a06b34bd15cbe691a3e17e87d526db721820192%2FScreenshot%202024-05-12%20at%204.48.22%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

Navigate back to the topics tab, and you should see the new "bento\_test\_2" topic now!

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-da332dc74d373f34fcc3fc9dcd08c78c10808794%2FScreenshot%202024-05-12%20at%204.49.50%20PM.png?alt=media" alt=""><figcaption></figcaption></figure>

That's it! If you have any questions or requests, hop in our [community Slack](https://console.warpstream.com/socials/slack)!

## WarpStream Specific YAML Configuration

### kafka\_franz\_warpstream blocks

Anywhere in a Bento configuration file that a `kafka_franz` block can exist, it can be replaced with a `kafka_franz_warpstream` block which will be transparently transformed by WarpStream to refer to the WarpStream cluster that is running the managed pipeline.

For example, the following configuration:

```yaml
input:
    kafka_franz:
        seed_brokers: ["localhost:9092"]
        topics: ["test_topic"]

    processors:
        - mapping: "root = content().capitalize()"

output:
    kafka_franz:
        seed_brokers: ["localhost:9092"]
        topic: "test_topic_capitalized"
```

Could instead be expressed as:

```yaml
input:
    kafka_franz_warpstream:
        topics: ["test_topic"]

    processors:
        - mapping: "root = content().capitalize()"

output:
    kafka_franz_warpstream:
        topic: "test_topic_capitalized"
```

While that may seem like a minor difference, the `kafka_franz_warpstream` block does more than just insert an appropriate value for `seed_broker`:

1. It automatically enables appropriate batching to improve performance with WarpStream.
2. It configures a zone-aware client ID that leverages WarpStream's zone-aware discovery system so that your data pipelines which write to and read from WarpStream will not incur any inter-az networking costs.
3. It detects which port the Agent is serving Kafka on, and configures that as the seed\_broker in-case a port other than `9092` is being used.
4. It automatically detects if authentication / authorization is required, and if so, configures the client with appropriate SASL credentials so your pipelines will work even when authentication is required and ACLs are enabled.

Remember, these blocks works *anywhere* that a `kafka_franz` block could be used. For example, you can write more advanced configurations like this, and it will still work:

```yaml
input:
    generate:
        count: 1000
        interval: 1s
        mapping: |
            root = if random_int() % 2 == 0 {
              {
                "type": "foo",
                "foo": "xyz"
              }
            } else {
              {
                "type": "bar",
                "bar": "abc"
              }
            }
output:
    broker:
        outputs:
            - file:
                codec: lines
                path: /tmp/data.txt
              type: file
            - kafka_franz_warpstream:
                topic: bento_test
              type: kafka_franz
        pattern: fan_out
```

Whenever communicating with the WarpStream cluster via the Kafka API, we recommend using these blocks instead of standard `kafka_franz` blocks.

{% hint style="info" %}
The `kafka_franz_warpstream` blocks are a wrapped around standard Bento `kafka_franz` blocks, so anything additional fields specified in the [Bento documentation](https://warpstreamlabs.github.io/bento/docs/components/inputs/kafka_franz) (like `tls.enabled` ) can be used in `kafka_franz_warpstream` blocks as well
{% endhint %}

## WarpStream Block

The `warpstream` block supports addition WarpStream specific functionality not available in Bento.

### Scheduling

The scheduling block supports running a pipeline at regular intervals. This is particularly useful for Bento pipelines where the input source has a natural termination point (as opposed to a Kafka input where will poll for new records forever). For example, when using GCP BigQuery as an input, the pipeline will run until all the records returned by the query have been processed.

```yaml
input:
  generate:
    count: 1
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  scheduling:
    run_every: 1s
```

Without the `run_every` configuration, this pipeline would run once and generate a single event and then terminate, but with the `run_every` block it will generate one record every second forever. Note that the pipeline will still run on every Agent, so in practice it will be one record/s \* NUM\_RUNNING\_AGENTS. See the [Concurrency Management](#concurrency-management) section below for more details on how to control concurrency.

### Concurrency Management

{% hint style="info" %}
Increasing pipeline concurrency will increase the load on the WarpStream Agents. For large or high volume data pipelines, consider using WarpStream's [Agent Roles](https://docs.warpstream.com/warpstream/kafka/advanced-agent-deployment-options/splitting-agent-roles) or [Agent Groups](https://docs.warpstream.com/warpstream/kafka/advanced-agent-deployment-options/agent-groups) features to isolate the data pipelines onto dedicated Agent deployments.
{% endhint %}

In addition to the standard Bento configuration, WarpStream data pipelines also support a special top level `warpstream` block. Today there are two additional configurations that this enables. The first is `cluster_concurrency_target`

```yaml
input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  cluster_concurrency_target: 1
```

`cluster_concurrency_target` controls the "target concurrency" for this pipeline. For example, if its set to 1, then that means that ideally you would prefer that only 1 "instance" of this pipeline run in the cluster at any given moment. However, for balancing and simplicity reasons, this value will always round to a multiple of the number of currently running Agents. The table below demonstrates how this behavior plays out in practice.

<table><thead><tr><th width="273">Cluster Concurrency Target</th><th># of Agents</th><th>Actual Concurrency</th></tr></thead><tbody><tr><td>1</td><td>3</td><td>3</td></tr><tr><td>2</td><td>3</td><td>3</td></tr><tr><td>3</td><td>3</td><td>3</td></tr><tr><td>4</td><td>3</td><td>6</td></tr></tbody></table>

Alternatively, you can express the concurrency more directly in terms of how many instances of the pipeline each Agent should run using the `agent_concurrency_target` config. For example,

<pre class="language-yaml"><code class="lang-yaml"><strong>input:
</strong>  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  agent_concurrency_target: 1
</code></pre>

<table><thead><tr><th width="278">Agent Concurrency Target</th><th># of Agents</th><th>Actual Concurrency</th></tr></thead><tbody><tr><td>1</td><td>3</td><td>3</td></tr><tr><td>2</td><td>3</td><td>6</td></tr><tr><td>3</td><td>3</td><td>9</td></tr></tbody></table>

### Error handling

Warpstream runs Bento pipelines in the strict mode documented [here](https://warpstreamlabs.github.io/bento/docs/components/processors/about#error-handling-1) in the Bento docs.\
In practice, we add a `error_handling.strategy: reject` block on every running pipeline.\
This means that Bento will reject all batches containing messages with errors, propagating a `nack` to the input layer (instead of attempting to send message batches that contain messages with errors to the configured sink).

### Pipeline Groups

Pipeline groups are similar to [Agent Groups](https://docs.warpstream.com/warpstream/kafka/advanced-agent-deployment-options/agent-groups), but for managed data pipelines. They can be used to isolate different managed pipelines onto different groups of Agents. For example, if you had a mixture of small and very large managed pipelines, you can isolate the larger pipelines onto a dedicated group of Agents so that they cant interfere with the smaller pipelines.

Agents with managed pipelines enabled (running the `pipelines` role) will run any pipeline that is assigned to the same group as them. If no pipeline group is specified, the Agents will run any pipeline that also doesn't have an assigned group.

| Agent Pipeline Group Name | Pipeline Group Name | Will Run? |
| ------------------------- | ------------------- | --------- |
| `N/A`                     | `N/A`               | Yes       |
| `small`                   | `N/A`               | No        |
| `large`                   | `N/A`               | No        |
| `N/A`                     | `small`             | No        |
| `small`                   | `small`             | Yes       |
| `large`                   | `small`             | No        |
| `N/A`                     | `large`             | No        |
| `small`                   | `large`             | No        |
| `large`                   | `large`             | Yes       |

#### Configuring the Agents

The pipeline group name can be any valid string. Configuring the pipeline group name for an Agent deployment is done by setting the `managedPipelinesGroupName` flag or `WARPSTREAM_MANAGED_PIPELINES_GROUP_NAME` environment variable.

#### Configuring the Pipeline

```yaml
input:
  generate:
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
        {
          "type": "foo",
          "foo": "xyz"
        }
      } else {
        {
          "type": "bar",
          "bar": "abc"
        }
      }
output:
  kafka_franz_warpstream:
    topic: bento_test
warpstream:
  agent_concurrency_target: 1
  pipeline_group: $PIPELINE_GROUP_NAME
```

## Monitoring / Observability

Managed data pipelines expose [all native Bento metrics](https://warpstreamlabs.github.io/bento/docs/components/metrics/about/) with the `warpstream_bento_` prefix.

In addition, we provide a `warpstream_active_pipeline_instances` metric which tracks the number of instances currently running on each agent for a given managed pipeline.

Finally, most WarpStream managed data pipelines run as simple Kafka consumers (if the primary input is a `kafka_franz_warpstream` block, so monitoring the health and progress of the pipeline is best handled in the same way that any other Kafka consumer is monitored, by monitoring [consumer group lag](https://docs.warpstream.com/warpstream/agent-setup/monitor-the-warpstream-agents/monitoring-consumer-groups).

## Secrets

Secrets are managed just as they are in Bento: using environment variables. This ensures that secrets never leave your environment and that the WarpStream control plane has no access to them.

Provide the secret as an environment variable to your WarpStream Agents, and they'll be available for use in the managed data pipelines configuration files. For example:

```yaml
input:
    kafka_franz_warpstream:
        consumer_group: "${SOME_SECRET}"
        topics:
            - test
    processors:
        - mapping: root = content().capitalize()
output:
    stdout:
        codec: lines
warpstream:
    cluster_concurrency_target: 1
```

The configuration above will look for an environment variable called `SOME_SECRET` and use that as the consumer group name for the `kafka_franz_warpstream` input block.

{% hint style="info" %}
For more about propagating secrets as environment variables to Bento, check out the [Bento secrets documentation](https://warpstreamlabs.github.io/bento/docs/configuration/secrets).
{% endhint %}

## Rate limiting

To prevent pipelines from overloading agents, the total throughput processed by an agent across all its pipelines is rate-limited.

By default, the rate limit is:

* 5MB/vCPU/s if the agent has both the `pipelines` role and any other role.
* 50MB/vCPU/s if the agent has only the `pipelines` role.

See documentation about agent roles [here](https://github.com/warpstreamlabs/docs/blob/master/byoc/bento/byoc/advanced-agent-deployment-options/splitting-agent-roles.md).

We highly recommend having dedicated agents with only the `pipelines` role.

You can override the default rate limit using the flag `-overridePipelinesSharedRateLimitPerVCPU` or the environment variable `WARPSTREAM_OVERRIDE_PIPELINES_SHARED_RATE_LIMIT_PER_VCPU`. This sets the rate limit in bytes per vCPU per second.

**Note:** Each agent enforces its own rate limit independently. The total cluster throughput scales with the number of agents and their vCPU counts.

**Example:** If you have 3 agents, each with 4 vCPUs and only the `pipelines` role:

* Each agent has a rate limit of 50MB/s × 4 vCPUs = 200MB/s
* Total cluster throughput capacity = 3 agents × 200MB/s = 600MB/s
* All pipelines running on the same agent share that agent's 200MB/s budget


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.warpstream.com/warpstream/kafka/manage-connectors/bento.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
