Benthos (beta)

This page explains how to use the WarpStream Benthos integration.


The WarpStream Benthos integration is currently in beta. We welcome your feedback in our Slack channel.

Benthos is a stream processing tool that makes "fancy stream processing operationally mundane". It offers much of the functionality of Kafka connect, as well as additional stream processing functionality like single message transforms, aggregations, multiplexing, enrichments, and more. It also has native support for WASM (WebAssembly) for more advanced processing.

Like WarpStream, Benthos is written in Go, so we embedded it natively into the WarpStream Agents to make it possible to perform basic stream processing tasks with a single, stateless binary.

Benthos is (currently) only supported in WarpStream's BYOC product. Stay tuned for Benthos support in our serverless product.

Getting Started

Getting started with WarpStream + Benthos is easy. The WarpStream Agents embed Benthos natively as a library, and enabling the Benthos integration is as simple as providing the WarpStream Agents with a Benthos configuration file. For example:

warpstream agent -virtualClusterID XXXXXXXX -apiKey XXXXXXXX -bucketURL s3://my-warpstream-bucket -benthosConfigPath $PATH_TO_BENTHOS_CONFIG

However, before deploying your benthos configuration to production, let's get started with a small stream processing task using an ephemeral WarpStream playground cluster. First, we need to write the Benthos configuration. The easiest way to learn how to do that is with the official Benthos documentation. For now though, we'll use the sample configuration below:

# benthos.yaml
    count: 1000
    interval: 1s
    mapping: |
      root = if random_int() % 2 == 0 {
          "type": "foo",
          "foo": "xyz"
      } else {
          "type": "bar",
          "bar": "abc"
    pattern: fan_out
      - type: file
          path: /tmp/data.txt
          codec: lines
      - type: kafka_franz
            - localhost:9092
          topic: benthos_test

Once you've written the Benthos configuration above to a benthos.yaml file locally, you can run it with the WarpStream Agent in playground mode like so:

warpstream playground -benthosConfigPath benthos.yaml

The output should look something like this:

WARNING, RUNNING IN PLAYGROUND MODE. All data and state is ephemeral. Server will shutdown automatically after: 4h0m0s

Signing up for temporary account...
Done signing up for temporary account
Starting local agent...

started agent, HTTP server on port: 8080 and kafka server on port: 9092

open the developer console at: https:/

Run cat /tmp/data.txt and you should see all the messages Benthos generated were written to a local file, as expected:

In addition, if you copy and paste the playground URL into your browser, you should see that the messages were written to the benthos_test topic in the WarpStream cluster.

That's it! If you have any questions or requests, hop in our community Slack!


Benthos is already a mature and powerful stream processing tool. However, we plan to make some big quality of life improvements for the WarpStream <> Benthos integration in the near future, like:

  1. Automatic integration with WarpStream's zone aware routing system.

  2. Resource controls.

  3. Automatic handling of SASL credentials for authenticated clusters.

  4. Streams mode.

  5. Change benthos config without restarting Agents.

  6. Start / stop Benthos without needing to start/stop the Agents.

  7. Expose the Benthos metrics.

  8. Allow running Benthos as a dedicated role.

In addition, we're official sponsors of the Benthos project and expect to contribute a variety of new source destinations in the near future.

Last updated


Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation. Kinesis is a trademark of Amazon Web Services.