LogoLogo
WarpStream.comSlackDiscordContact UsCreate Account
  • Overview
    • Introduction
    • Architecture
      • Service Discovery
      • Write Path
      • Read Path
      • Life of a Request (Simplified)
    • Change Log
  • Getting Started
    • Install the WarpStream Agent / CLI
    • Run the Demo
    • "Hello World" for Apache Kafka
  • BYOC
    • Run the Agents Locally
    • Deploy the Agents
      • Object Storage Configuration
      • Kubernetes Known Issues
      • Rolling Restarts and Upgrades
    • Infrastructure as Code
      • Terraform Provider
      • Helm charts
      • Terraform Modules
    • Monitoring
      • Pre-made Datadog Dashboard
      • Pre-made Grafana Dashboard
      • Important Metrics and Logs
      • Recommended List of Alerts
      • Monitoring Consumer Groups
      • Hosted Prometheus Endpoint
    • Client Configuration
      • Tuning for Performance
      • Configure Clients to Eliminate AZ Networking Costs
        • Force Interzone Load Balancing
      • Configuring Kafka Client ID Features
      • Known Issues
    • Authentication
      • SASL Authentication
      • Mutual TLS (mTLS)
      • Basic Authentication
    • Advanced Agent Deployment Options
      • Agent Roles
      • Agent Groups
      • Protect Data in Motion with TLS Encryption
      • Low Latency Clusters
      • Network Architecture Considerations
      • Agent Configuration Reference
      • Reducing Infrastructure Costs
      • Client Configuration Auto-tuning
    • Hosted Metadata Endpoint
    • Managed Data Pipelines
      • Cookbooks
    • Schema Registry
      • WarpStream BYOC Schema Registry
      • Schema Validation
      • WarpStream Schema Linking
    • Port Forwarding (K8s)
    • Orbit
    • Enable SAML Single Sign-on (SSO)
    • Trusted Domains
    • Diagnostics
      • GoMaxProcs
      • Small Files
  • Reference
    • ACLs
    • Billing
      • Direct billing
      • AWS Marketplace
    • Benchmarking
    • Compression
    • Protocol and Feature Support
      • Kafka vs WarpStream Configuration Reference
      • Compacted topics
    • Secrets Overview
    • Security and Privacy Considerations
    • API Reference
      • API Keys
        • Create
        • Delete
        • List
      • Virtual Clusters
        • Create
        • Delete
        • Describe
        • List
        • DescribeConfiguration
        • UpdateConfiguration
      • Virtual Clusters Credentials
        • Create
        • Delete
        • List
      • Monitoring
        • Describe All Consumer Groups
      • Pipelines
        • List Pipelines
        • Create Pipeline
        • Delete Pipeline
        • Describe Pipeline
        • Create Pipeline Configuration
        • Change Pipeline State
      • Invoices
        • Get Pending Invoice
        • Get Past Invoice
    • CLI Reference
      • warpstream agent
      • warpstream demo
      • warpstream cli
      • warpstream cli-beta
        • benchmark-consumer
        • benchmark-producer
        • console-consumer
        • console-producer
        • consumer-group-lag
        • diagnose-record
        • file-reader
        • file-scrubber
      • warpstream playground
    • Integrations
      • Arroyo
      • AWS Lambda Triggers
      • ClickHouse
      • Debezium
      • Decodable
      • DeltaStream
      • docker-compose
      • DuckDB
      • ElastiFlow
      • Estuary
      • Fly.io
      • Imply
      • InfluxDB
      • Kestra
      • Materialize
      • MinIO
      • MirrorMaker
      • MotherDuck
      • Ockam
      • OpenTelemetry Collector
      • ParadeDB
      • Parquet
      • Quix Streams
      • Railway
      • Redpanda Console
      • RisingWave
      • Rockset
      • ShadowTraffic
      • SQLite
      • Streambased
      • Streamlit
      • Timeplus
      • Tinybird
      • Upsolver
    • Partitions Auto-Scaler (beta)
    • Serverless Clusters
Powered by GitBook
On this page
  • Prerequisites
  • Step 1: Create a topic in your WarpStream cluster
  • Step 2: Produce some records
  • Step 3: Connect Materialize with WarpStream
  • Step 4: Verify that Materialize is ingesting data from WarpStream
  • Step 5: Create a materialized view
  • Step 6: Produce another record to your WarpStream topic
  • Next Steps

Was this helpful?

  1. Reference
  2. Integrations

Materialize

This page describes how to set up a connection between WarpStream and Materialize, ingest data into Materialize, and create a materialized view of this data.

PreviousKestraNextMinIO

Last updated 5 months ago

Was this helpful?

A video walkthrough can be found below:

Prerequisites

  1. WarpStream cluster is up and running.

Step 1: Create a topic in your WarpStream cluster

Store these values as environment variables for easy reference:

export BOOTSTRAP_HOST=<YOUR_BOOTSTRAP_BROKER> \
SASL_USERNAME=<YOUR_SASL_USERNAME> \
SASL_PASSWORD=<YOUR_SASL_PASSWORD>;

Then, create a topic using the WarpStream CLI:

warpstream kcmd -bootstrap-host $BOOTSTRAP_HOST -tls -username $SASL_USERNAME -password $SASL_PASSWORD -type create-topic -topic materialize_click_streams

You should see the following output in your Terminal:

Created topic materialize_click_streams.

Step 2: Produce some records

Using the WarpStream CLI, produce several messages to your topic:

warpstream kcmd -bootstrap-host $BOOTSTRAP_HOST -tls -username $SASL_USERNAME -password $SASL_PASSWORD -type produce -topic materialize_click_streams --records '{"action": "click", "user_id": "user_0", "page_id": "home"},,{"action": "hover", "user_id": "user_0", "page_id": "home"},,{"action": "scroll", "user_id": "user_0", "page_id": "home"}'

Note that the WarpStream CLI uses double commas (,,) as a delimiter between JSON records.

Step 3: Connect Materialize with WarpStream

First, provide your WarpStream credentials as SECRET objects in the Materialize console:

CREATE SECRET warpstream_username AS 'ccun_XXXXXXXXXX';
CREATE SECRET warpstream_password AS 'ccp_XXXXXXXXXX';

Then, establish a CONNECTION object (note: replace $BOOTSTRAP_HOST with its value from Step 1):

CREATE CONNECTION warpstream_kafka TO KAFKA (
    BROKER '$BOOTSTRAP_HOST',
    SASL MECHANISMS = "PLAIN",
    SASL USERNAME = SECRET warpstream_username,
    SASL PASSWORD = SECRET warpstream_password
);

Finally, create a SOURCE object to being consuming messages from your WarpStream topic:

CREATE SOURCE warpstream_click_stream_source
  FROM KAFKA CONNECTION warpstream_kafka (TOPIC 'materialize_click_streams')
  FORMAT JSON;

Step 4: Verify that Materialize is ingesting data from WarpStream

In the Materialize Console, run:

SELECT * FROM warpstream_click_stream_source;

You should see the results displaying the records that you produced in Step 2.

Step 5: Create a materialized view

First, create the materialized view:

CREATE MATERIALIZED VIEW
    warpstream_click_stream_mv
AS
    SELECT (data::jsonb)['user_id'], COUNT(*)
    FROM warpstream_click_stream_source
    GROUP BY (data::jsonb)['user_id'];

Then, query the result:

SELECT * FROM warpstream_click_stream_mv;

You should see a result showing user_0 with a count of 3 records.

Step 6: Produce another record to your WarpStream topic

In your Terminal, produce another record to your topic:

warpstream kcmd -bootstrap-host $BOOTSTRAP_HOST -tls -username $SASL_USERNAME -password $SASL_PASSWORD -type produce -topic materialize_click_streams --records '{"action": "click", "user_id": "user_1", "page_id": "home"}'

In the Materialize Console, query the materialized view again:

SELECT * FROM warpstream_click_stream_mv;

You should now see two records, the record for user_0 with a count of 3, and a new record for user_1 with a count of 1.

Next Steps

After validating that your WarpStream cluster is connected to Materialize and that Materialize can consume and process data from WarpStream, you are ready to set up an actual data pipeline between WarpStream and Materialize.

WarpStream account - get access to WarpStream by registering .

Materialize account - get access to Materialize by registering .

Obtain the Bootstrap Broker from the WarpStream console by navigating to your cluster and then clicking the Connect tab. If you don't have SASL credentials yet, you can also from the console.

For more information on how to set up and configure Materialize, head over to the page.

here
here
Materialize Docs
create a set of credentials