# RisingWave

A video walkthrough can be found below:

{% embed url="<https://youtu.be/g-u7jLQUUZU>" %}

[RisingWave](https://github.com/risingwavelabs/risingwave) is a distributed SQL streaming database. In this "how-to," we'll show you how to connect it to WarpStream using the Apache Kafka API to create a stream processing application that performs streaming aggregations using RisingWave materialized views.

First, follow our instructions for [running the WarpStream Agent locally](/warpstream/getting-started/run-the-agent-locally.md). We recommend running the: `warpstream demo` command since it will automatically generate a Kafka topic with fake click stream data for us to consume.

Next, follow the RisingWave [Quick Start](https://www.risingwave.dev/docs/current/get-started) instructions to install RisingWave locally and connect to it with `psql`.

Once you have started a RisingWave instance with the `risingwave playground` command and then connected to it by running `psql -h localhost -p 4566 -d dev -U root` , create a new table called `website_visits`:

```sql
CREATE TABLE website_visits (
  timestamp timestamp,
  user_id varchar,
  page_id varchar,
  action varchar
);
```

Next, create a RisingWave Kafka source:

```sql
CREATE SOURCE IF NOT EXISTS website_visits_stream (
 timestamp timestamp,
 user_id varchar,
 page_id varchar,
 action varchar
 )
WITH (
 connector='kafka',
 topic='demo-topic',
 properties.bootstrap.server='localhost:9092',
 scan.startup.mode='earliest'
 ) FORMAT PLAIN ENCODE JSON;
```

Next, create a RisingWave materialized view:

```sql
CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id, 
count(*) AS total_visits, 
count(DISTINCT user_id) AS unique_visitors, 
max(timestamp) AS last_visit_time 
FROM website_visits_stream 
GROUP BY page_id;
```

Finally, query the newly created materialized view:

```sql
select * FROM visits_stream_mv;
```

You should see the view to continue updating over time as you reissue the query:

```sql
dev=> select * FROM visits_stream_mv;
 page_id | total_visits | unique_visitors |   last_visit_time   
---------+--------------+-----------------+---------------------
 page_0  |            2 |               2 | 2023-07-26 19:03:08
 page_4  |            9 |               9 | 2023-07-26 19:03:00
 page_8  |            9 |               9 | 2023-07-26 19:02:57
 page_3  |           14 |              14 | 2023-07-26 19:03:09
 page_7  |            4 |               4 | 2023-07-26 19:02:52
 page_1  |            7 |               6 | 2023-07-26 19:02:55
 page_5  |            9 |               9 | 2023-07-26 19:03:01
 page_9  |           12 |              12 | 2023-07-26 19:02:48
 page_2  |            4 |               4 | 2023-07-26 19:02:58
 page_6  |            7 |               6 | 2023-07-26 19:03:03
(10 rows)

```

Congratulations, you just built a streaming materialized view directly over data flowing through WarpStream!

If you want to quickly recreate the RisingWave environment, you can copy and paste this entire command into `psql`:

```sql
CREATE TABLE website_visits (
  timestamp timestamp,
  user_id varchar,
  page_id varchar,
  action varchar
);

CREATE SOURCE IF NOT EXISTS website_visits_stream (
 timestamp timestamp,
 user_id varchar,
 page_id varchar,
 action varchar
 )
WITH (
 connector='kafka',
 topic='demo-stream',
 properties.bootstrap.server='localhost:9092',
 scan.startup.mode='earliest'
 ) ROW FORMAT JSON;
 
CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id, 
count(*) AS total_visits, 
count(DISTINCT user_id) AS unique_visitors, 
max(timestamp) AS last_visit_time 
FROM website_visits_stream 
GROUP BY page_id;
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.warpstream.com/warpstream/reference/integrations/use-warpstream-with-risingwave.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
