# RisingWave

A video walkthrough can be found below:

{% embed url="<https://youtu.be/g-u7jLQUUZU>" %}

[RisingWave](https://github.com/risingwavelabs/risingwave) is a distributed SQL streaming database. In this "how-to," we'll show you how to connect it to WarpStream using the Apache Kafka API to create a stream processing application that performs streaming aggregations using RisingWave materialized views.

First, follow our instructions for [running the WarpStream Agent locally](https://docs.warpstream.com/warpstream/getting-started/run-the-agent-locally). We recommend running the: `warpstream demo` command since it will automatically generate a Kafka topic with fake click stream data for us to consume.

Next, follow the RisingWave [Quick Start](https://www.risingwave.dev/docs/current/get-started) instructions to install RisingWave locally and connect to it with `psql`.

Once you have started a RisingWave instance with the `risingwave playground` command and then connected to it by running `psql -h localhost -p 4566 -d dev -U root` , create a new table called `website_visits`:

```sql
CREATE TABLE website_visits (
  timestamp timestamp,
  user_id varchar,
  page_id varchar,
  action varchar
);
```

Next, create a RisingWave Kafka source:

```sql
CREATE SOURCE IF NOT EXISTS website_visits_stream (
 timestamp timestamp,
 user_id varchar,
 page_id varchar,
 action varchar
 )
WITH (
 connector='kafka',
 topic='demo-topic',
 properties.bootstrap.server='localhost:9092',
 scan.startup.mode='earliest'
 ) FORMAT PLAIN ENCODE JSON;
```

Next, create a RisingWave materialized view:

```sql
CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id, 
count(*) AS total_visits, 
count(DISTINCT user_id) AS unique_visitors, 
max(timestamp) AS last_visit_time 
FROM website_visits_stream 
GROUP BY page_id;
```

Finally, query the newly created materialized view:

```sql
select * FROM visits_stream_mv;
```

You should see the view to continue updating over time as you reissue the query:

```sql
dev=> select * FROM visits_stream_mv;
 page_id | total_visits | unique_visitors |   last_visit_time   
---------+--------------+-----------------+---------------------
 page_0  |            2 |               2 | 2023-07-26 19:03:08
 page_4  |            9 |               9 | 2023-07-26 19:03:00
 page_8  |            9 |               9 | 2023-07-26 19:02:57
 page_3  |           14 |              14 | 2023-07-26 19:03:09
 page_7  |            4 |               4 | 2023-07-26 19:02:52
 page_1  |            7 |               6 | 2023-07-26 19:02:55
 page_5  |            9 |               9 | 2023-07-26 19:03:01
 page_9  |           12 |              12 | 2023-07-26 19:02:48
 page_2  |            4 |               4 | 2023-07-26 19:02:58
 page_6  |            7 |               6 | 2023-07-26 19:03:03
(10 rows)

```

Congratulations, you just built a streaming materialized view directly over data flowing through WarpStream!

If you want to quickly recreate the RisingWave environment, you can copy and paste this entire command into `psql`:

```sql
CREATE TABLE website_visits (
  timestamp timestamp,
  user_id varchar,
  page_id varchar,
  action varchar
);

CREATE SOURCE IF NOT EXISTS website_visits_stream (
 timestamp timestamp,
 user_id varchar,
 page_id varchar,
 action varchar
 )
WITH (
 connector='kafka',
 topic='demo-stream',
 properties.bootstrap.server='localhost:9092',
 scan.startup.mode='earliest'
 ) ROW FORMAT JSON;
 
CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id, 
count(*) AS total_visits, 
count(DISTINCT user_id) AS unique_visitors, 
max(timestamp) AS last_visit_time 
FROM website_visits_stream 
GROUP BY page_id;
```
