This page describes how to integrate WarpStream with Decodable, ingest data into Decodable from WarpStream, and query the data in Decodable.


  1. WarpStream account - get access to WarpStream by registering here.

  2. Decodable account - get access to Decodable by registering here.

  3. WarpStream cluster up and running.

Step 1: Create a topic in your WarpStream cluster

Obtain the Bootstrap Broker from the WarpStream console by navigating to your cluster, then clicking the Connect tab. If you don't have SASL credentials yet, you can also create a set of credentials from the console.

Store these values as environment variables for easy reference:


Then, create a topic using the WarpStream CLI:

warpstream kcmd -bootstrap-host $WARPSTREAM_BOOTSTRAP_HOST -tls -username $WARPSTREAM_SASL_USERNAME -password $WARPSTREAM_SASL_PASSWORD -type create-topic -topic decodable_demo

You should see the following output in your Terminal:

Created topic decodable_demo.

Step 2: Produce some records

Using the WarpStream CLI, produce several messages to your topic:

warpstream kcmd -bootstrap-host $WARPSTREAM_BOOTSTRAP_HOST -tls -username $WARPSTREAM_SASL_USERNAME -password $WARPSTREAM_SASL_PASSWORD -type produce -topic decodable_demo --records '{"action": "click", "user_id": "user_0", "page_id": "home"},,{"action": "hover", "user_id": "user_0", "page_id": "home"},,{"action": "scroll", "user_id": "user_0", "page_id": "home"},,{"action": "click", "user_id": "user_1", "page_id": "home"},,{"action": "click", "user_id": "user_1", "page_id": "home"},,{"action": "click", "user_id": "user_2", "page_id": "home"}'

Note that the WarpStream CLI uses double commas (,,) as a delimiter between JSON records.

Step 3: Connect Decodable to WarpStream

In the Decodable Web Console, under the Start Building heading, click "Connect to a source". Select "Apache Kafka", and configure Decodable to read from your WarpStream topic.

Click Next.

Create a new stream named decodable_demo:

Specify the schema of our data:

And finally, finish creating the Connection:

Step 4: Start the Connection and verify that Decodable is ingesting data from WarpStream

In the upper right corner of the Connection overview screen, click "Start", then start the connection with the default settings:

After a few moments, the Decodable Connection should transition from Starting to Running, and the Output Metrics will show that Decodable is consuming from WarpStream:

Navigate to Streams > decodable_demo to view a sample of your data in Decodable.

Step 5: Create a Pipeline

In the Decodable Web Console, navigate to Pipelines > Create your first Pipeline and select decodable_demo from the list of Streams.

This query creates a new Stream, and inserts only the records that have a user_id of user_0.

Click Next.

Click Create Stream, then click Next.

Name your Pipeline and click Create Pipeline.

Produce some more records to WarpStream from your Terminal (you can add more records if you want, just remember to use the ,, delimiter in the WarpStream CLI):

warpstream kcmd -bootstrap-host $WARPSTREAM_BOOTSTRAP_HOST -tls -username $WARPSTREAM_SASL_USERNAME -password $WARPSTREAM_SASL_PASSWORD -type produce -topic decodable_demo --records '{"action": "click", "user_id": "user_0", "page_id": "home"},,{"action": "hover", "user_id": "user_1", "page_id": "home"},,{"action": "scroll", "user_id": "user_1", "page_id": "home"},,{"action": "click", "user_id": "", "page_id": "home"},,{"action": "click", "user_id": "", "page_id": "home"},,{"action": "click", "user_id": "user_0", "page_id": "home"}'

And then Start your Pipeline, configuring it to read from the earliest offset:

After a few moments, you should be able to observe the Input and Output Metrics, confirming that the Pipeline is working as expected!

Next Steps

Congratulations! You've set up a stream processing pipeline between WarpStream and Decodable, and created a simple Pipeline that filters data out of a firehose of events.

Next, check out the WarpStream docs for configuring the WarpStream Agent, or review the Decodable docs to learn more about what is possible with WarpStream and Decodable!

Last updated


Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation. Kinesis is a trademark of Amazon Web Services.