LogoLogo
WarpStream.comSlackDiscordContact UsCreate Account
  • Overview
    • Introduction
    • Architecture
      • Service Discovery
      • Write Path
      • Read Path
      • Life of a Request (Simplified)
    • Change Log
  • Getting Started
    • Install the WarpStream Agent / CLI
    • Run the Demo
    • "Hello World" for Apache Kafka
  • BYOC
    • Run the Agents Locally
    • Deploy the Agents
      • Object Storage Configuration
      • Kubernetes Known Issues
      • Rolling Restarts and Upgrades
    • Infrastructure as Code
      • Terraform Provider
      • Helm charts
      • Terraform Modules
    • Monitoring
      • Pre-made Datadog Dashboard
      • Pre-made Grafana Dashboard
      • Important Metrics and Logs
      • Recommended List of Alerts
      • Monitoring Consumer Groups
      • Hosted Prometheus Endpoint
    • Client Configuration
      • Tuning for Performance
      • Configure Clients to Eliminate AZ Networking Costs
        • Force Interzone Load Balancing
      • Configuring Kafka Client ID Features
      • Known Issues
    • Authentication
      • SASL Authentication
      • Mutual TLS (mTLS)
      • Basic Authentication
    • Advanced Agent Deployment Options
      • Agent Roles
      • Agent Groups
      • Protect Data in Motion with TLS Encryption
      • Low Latency Clusters
      • Network Architecture Considerations
      • Agent Configuration Reference
      • Reducing Infrastructure Costs
      • Client Configuration Auto-tuning
    • Hosted Metadata Endpoint
    • Managed Data Pipelines
      • Cookbooks
    • Schema Registry
      • WarpStream BYOC Schema Registry
      • Schema Validation
      • WarpStream Schema Linking
    • Port Forwarding (K8s)
    • Orbit
    • Enable SAML Single Sign-on (SSO)
    • Trusted Domains
    • Diagnostics
      • GoMaxProcs
      • Small Files
  • Reference
    • ACLs
    • Billing
      • Direct billing
      • AWS Marketplace
    • Benchmarking
    • Compression
    • Protocol and Feature Support
      • Kafka vs WarpStream Configuration Reference
      • Compacted topics
    • Secrets Overview
    • Security and Privacy Considerations
    • API Reference
      • API Keys
        • Create
        • Delete
        • List
      • Virtual Clusters
        • Create
        • Delete
        • Describe
        • List
        • DescribeConfiguration
        • UpdateConfiguration
      • Virtual Clusters Credentials
        • Create
        • Delete
        • List
      • Monitoring
        • Describe All Consumer Groups
      • Pipelines
        • List Pipelines
        • Create Pipeline
        • Delete Pipeline
        • Describe Pipeline
        • Create Pipeline Configuration
        • Change Pipeline State
      • Invoices
        • Get Pending Invoice
        • Get Past Invoice
    • CLI Reference
      • warpstream agent
      • warpstream demo
      • warpstream cli
      • warpstream playground
    • Integrations
      • Arroyo
      • AWS Lambda Triggers
      • ClickHouse
      • Debezium
      • Decodable
      • DeltaStream
      • docker-compose
      • DuckDB
      • ElastiFlow
      • Estuary
      • Fly.io
      • Imply
      • InfluxDB
      • Kestra
      • Materialize
      • MinIO
      • MirrorMaker
      • MotherDuck
      • Ockam
      • OpenTelemetry Collector
      • ParadeDB
      • Parquet
      • Quix Streams
      • Railway
      • Redpanda Console
      • RisingWave
      • Rockset
      • ShadowTraffic
      • SQLite
      • Streambased
      • Streamlit
      • Timeplus
      • Tinybird
      • Upsolver
    • Partitions Auto-Scaler (beta)
    • Serverless Clusters
Powered by GitBook
On this page
  • Prerequisites
  • Step 1: Create a cluster and topic in WarpStream
  • Step 2: Produce some records
  • Step 3: Configure Kestra
  • Step 4: Conditional logic
  • Next Steps

Was this helpful?

  1. Reference
  2. Integrations

Kestra

This page describes how to integrate WarpStream with Kestra. Kestra is an event-driven data orchestration platform with a UI and command-line interface.

PreviousInfluxDBNextMaterialize

Last updated 4 months ago

Was this helpful?

A video walkthrough can be found below:

Prerequisites

  1. Serverless WarpStream cluster up and running - explanation is below.

Step 1: Create a cluster and topic in WarpStream

You will need a running cluster that contains a topic with data. For this example, we will use the WarpStream tutorial cluster and topic that you get the first time you create a WarpStream cluster.

Store these values for easy reference; they will be needed in Kestra. If you are going to produce records to your topic from the command line, then export them as environment variables in a terminal window:

export BOOTSTRAP_HOST=<YOUR_BOOTSTRAP_BROKER> \
SASL_USERNAME=<YOUR_SASL_USERNAME> \
SASL_PASSWORD=<YOUR_SASL_PASSWORD>;

Then, create a topic in the WarpStream console if you don't already have one. Ensure data is being produced to a topic for Kestra to act on.

Step 2: Produce some records

You can use the WarpStream CLI to produce messages to your topic if you don't have one already:

warpstream kcmd -bootstrap-host $BOOTSTRAP_HOST -tls -username $SASL_USERNAME -password $SASL_PASSWORD -type produce -topic kestra_demo --records '{"action": "click", "user_id": "user_0", "page_id": "home"},,{"action": "hover", "user_id": "user_0", "page_id": "home"},,{"action": "scroll", "user_id": "user_0", "page_id": "home"},,{"action": "click", "user_id": "user_1", "page_id": "home"},,{"action": "click", "user_id": "user_1", "page_id": "home"},,{"action": "click", "user_id": "user_2", "page_id": "home"}'

Note that the WarpStream CLI uses double commas (,,) as a delimiter between JSON records.

Step 3: Configure Kestra

In the Kestra interface, you will start under Flows and select Create.

This will present you with the Kestra editor, which contains our example YAML.

This example will use the Kestra "Real Time Trigger" feature to execute an action whenever a record arrives in the stream. Where you see the id: and type: under triggers: is where we specify and name the feature. Fill in all the fields from WarpStream that are denoted with <YOUR_...> and the resulting flow will then read from your WarpStream producer, and for every record that arrives, it will trigger the action under tasks: which, in this case, will write the message to a log file that is contained in trigger.value.

id: myflow
namespace: company.myteam

tasks:
- id: LOG
  type: io.kestra.plugin.core.LOG.LOG
  message: "{{ trigger.value }}"

triggers:
- id: realtime_trigger
  type: io.kestra.plugin.kafka.RealtimeTrigger
  topic: <YOUR_TOPIC>
  properties:
    bootstrap.servers: <YOUR_BOOTSTRAP_BROKER>
    security.protocol: SASL_SSL
    sasl.mechanism: PLAIN
    sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username=<YOUR_SASL_USERNAME> password=<YOUR_SASL_PASSWORD>;
  serdeProperties:
    schema.registry.url: http://localhost:8085
    keyDeserializer: STRING
    valueDeserializer: STRING
  groupId: <YOUR_TOPIC>

Step 4: Conditional logic

The Kestra trigger can be enhanced with conditional logic. For example, your WarpStream topic is a series of IoT status messages; you could look for a string with a value of "error" and log it appropriately. This is illustrated in the following example that would replace the tasks: section from the previous example:

tasks:
- id: if_condition
  type: io.kestra.plugin.core.flow.If
  condition: "{{ trigger.value == 'error' }}"
  then:
    - id: log
      type: io.kestra.plugin.core.log.Log
      message: "Error message: {{ execution.id }}"
  else:
    - id: log_else
      type: io.kestra.plugin.core.log.Log
      message: "Nothing to worry about."

Next Steps

Congratulations! You've set up a stream processing pipeline between WarpStream and Kestra and triggered an action based on both activity and conditional logic processing.

WarpStream account - get access to WarpStream by registering .

Install and run Kestra - instructions are .

Obtain the Bootstrap Broker from the WarpStream console by navigating to your cluster and clicking the Connect tab. If you don't have SASL credentials yet, you can also from the console.

Next, check out the WarpStream docs for configuring the , or review the to learn more about what is possible with WarpStream and Kestra!

here
here
WarpStream Agent
Kestra docs
create a set of credentials