RisingWave

Instructions on how to use WarpStream with RisingWave.

A video walkthrough can be found below:

RisingWave is a distributed SQL streaming database. In this "how-to," we'll show you how to connect it to WarpStream using the Apache Kafka API to create a stream processing application that performs streaming aggregations using RisingWave materialized views.

First, follow our instructions for running the WarpStream Agent locally. We recommend running the: warpstream demo command since it will automatically generate a Kafka topic with fake click stream data for us to consume.

Next, follow the RisingWave Quick Start instructions to install RisingWave locally and connect to it with psql.

Once you have started a RisingWave instance with the risingwave playground command and then connected to it by running psql -h localhost -p 4566 -d dev -U root , create a new table called website_visits:

CREATE TABLE website_visits (
  timestamp timestamp,
  user_id varchar,
  page_id varchar,
  action varchar
);

Next, create a RisingWave Kafka source:

CREATE SOURCE IF NOT EXISTS website_visits_stream (
 timestamp timestamp,
 user_id varchar,
 page_id varchar,
 action varchar
 )
WITH (
 connector='kafka',
 topic='demo-topic',
 properties.bootstrap.server='localhost:9092',
 scan.startup.mode='earliest'
 ) FORMAT PLAIN ENCODE JSON;

Next, create a RisingWave materialized view:

CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id, 
count(*) AS total_visits, 
count(DISTINCT user_id) AS unique_visitors, 
max(timestamp) AS last_visit_time 
FROM website_visits_stream 
GROUP BY page_id;

Finally, query the newly created materialized view:

select * FROM visits_stream_mv;

You should see the view to continue updating over time as you reissue the query:

dev=> select * FROM visits_stream_mv;
 page_id | total_visits | unique_visitors |   last_visit_time   
---------+--------------+-----------------+---------------------
 page_0  |            2 |               2 | 2023-07-26 19:03:08
 page_4  |            9 |               9 | 2023-07-26 19:03:00
 page_8  |            9 |               9 | 2023-07-26 19:02:57
 page_3  |           14 |              14 | 2023-07-26 19:03:09
 page_7  |            4 |               4 | 2023-07-26 19:02:52
 page_1  |            7 |               6 | 2023-07-26 19:02:55
 page_5  |            9 |               9 | 2023-07-26 19:03:01
 page_9  |           12 |              12 | 2023-07-26 19:02:48
 page_2  |            4 |               4 | 2023-07-26 19:02:58
 page_6  |            7 |               6 | 2023-07-26 19:03:03
(10 rows)

Congratulations, you just built a streaming materialized view directly over data flowing through WarpStream!

If you want to quickly recreate the RisingWave environment, you can copy and paste this entire command into psql:

CREATE TABLE website_visits (
  timestamp timestamp,
  user_id varchar,
  page_id varchar,
  action varchar
);

CREATE SOURCE IF NOT EXISTS website_visits_stream (
 timestamp timestamp,
 user_id varchar,
 page_id varchar,
 action varchar
 )
WITH (
 connector='kafka',
 topic='demo-stream',
 properties.bootstrap.server='localhost:9092',
 scan.startup.mode='earliest'
 ) ROW FORMAT JSON;
 
CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id, 
count(*) AS total_visits, 
count(DISTINCT user_id) AS unique_visitors, 
max(timestamp) AS last_visit_time 
FROM website_visits_stream 
GROUP BY page_id;

Last updated