RisingWave is a distributed SQL streaming database. In this "how-to," we'll show you how to connect it to WarpStream using the Apache Kafka API to create a stream processing application that performs streaming aggregations using RisingWave materialized views.
First, follow our instructions for running the WarpStream Agent locally. We recommend running the: warpstream demo command since it will automatically generate a Kafka topic with fake click stream data for us to consume.
Next, follow the RisingWave Quick Start instructions to install RisingWave locally and connect to it with psql.
Once you have started a RisingWave instance with the risingwave playground command and then connected to it by running psql -h localhost -p 4566 -d dev -U root , create a new table called website_visits:
CREATE SOURCE IF NOT EXISTS website_visits_stream (timestamptimestamp, user_id varchar, page_id varchar,actionvarchar )WITH ( connector='kafka', topic='demo-topic', properties.bootstrap.server='localhost:9092', scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
Next, create a RisingWave materialized view:
CREATE MATERIALIZED VIEW visits_stream_mv ASSELECT page_id, count(*) AS total_visits, count(DISTINCT user_id) AS unique_visitors, max(timestamp) AS last_visit_time FROM website_visits_stream GROUP BY page_id;
Finally, query the newly created materialized view:
select* FROM visits_stream_mv;
You should see the view to continue updating over time as you reissue the query: