is a distributed SQL streaming database. In this "how-to," we'll show you how to connect it to WarpStream using the Apache Kafka API to create a stream processing application that performs streaming aggregations using RisingWave materialized views.
First, follow our instructions for . We recommend running the: warpstream demo command since it will automatically generate a Kafka topic with fake click stream data for us to consume.
Once you have started a RisingWave instance with the risingwave playground command and then connected to it by running psql -h localhost -p 4566 -d dev -U root , create a new table called website_visits:
CREATE SOURCE IF NOT EXISTS website_visits_stream (
timestamp timestamp,
user_id varchar,
page_id varchar,
action varchar
)
WITH (
connector='kafka',
topic='demo-topic',
properties.bootstrap.server='localhost:9092',
scan.startup.mode='earliest'
) FORMAT PLAIN ENCODE JSON;
Next, create a RisingWave materialized view:
CREATE MATERIALIZED VIEW visits_stream_mv AS
SELECT page_id,
count(*) AS total_visits,
count(DISTINCT user_id) AS unique_visitors,
max(timestamp) AS last_visit_time
FROM website_visits_stream
GROUP BY page_id;
Finally, query the newly created materialized view:
select * FROM visits_stream_mv;
You should see the view to continue updating over time as you reissue the query: