Quix Streams
This page describes how to use the Quix Streams Python library to read and aggregate data from WarpStream and ingest the aggregations into a local DuckDB database for offline querying.
Quix Streams is a cloud-native library for processing data in Kafka using pure Python. It’s designed to give you the power of a distributed system in a lightweight library by combining Kafka's low-level scalability and resiliency features with an easy-to-use Python interface.
Prerequisites
A WarpStream account - get access to WarpStream by registering on the sign-up page..
A WarpStream cluster is up and running.
Installations of Git and Python with the required Python modules:
Quix Streams
DuckDB
python-dotenv
We recommend setting up a Python virtual environment before you install the required python Python modules.
You can install the dependencies with the following command:
Step 1: Clone the demo repo
The Quix team has prepared a demo repo to help you get started with Quix Streams and WarpStream.
Open a terminal window and clone the following repo.
The repo contains the following Python files representing some basic data pipeline steps.
1_produce_demo_data.py
2_process_demo_data.py
3_sink_demo_data.py
4_query_demo_data.py
Step 2: Create an environment file
Create a .env
file to store the required environment variables. You can use the .env_example
file in the repo as a reference or run the following command in a bash shell:
Step 3: Produce some records
Use a Quix Streams producer application to continuously produce some demo data to a topic. When you run the application for the first time, Quix Streams ensures that the required topics are automatically created if they don’t exist already.
Open a terminal window inside the repo directory and run the following command:
python 1_process_demo_data.py
You should see some output that resembles the following example:
Here’s an important excerpt from the code you just ran:
The previous code initializes a connection to a serverless WarpStream cluster using the credentials defined in your .env
file. Once a connection has been established, it continuously generates some synthetic page view data and produces it to a Kafka topic (the name of this topic is defined in the "raw_data_topic" environment variable).
Keep the producer terminal window open, and open a second terminal window.
Step 4: Process the records
As you can see in the producer output, the synthetic data represents a very basic user activity log for a website. Suppose that you want to see the most active pages. To do so, you could continuously count the number of user actions (regardless of type) and aggregate the counts by page.
You can achieve this with a simple Quix Streams processor application.
To see it in action, run the following command in a second terminal window python 2_process_demo_data.py
You should see some output that resembles the following example:
Here are the important excerpts from the code:
The line 'sdf = sdf.group_by("page_id")
’—This line rekeys the stream so that the messages are keyed by the page_id. The count_messages
function then keeps a running total of the action counts.
Under the hood, it maintains an entry for each message key (in this case, page_id) in the state and updates the relevant action count whenever a new message is processed. This means that if the process is interrupted or restarted, it can pick up from where it left off. In essence, the state store serves a similar role to a lightweight database.
This repo includes a sample file state_example.json
to give you an idea of what the state would look like for this example.
The aggregated results are then written to a second downstream topic using the to_topic
function.
Keep the processor terminal window open, and open a third window.
Step 4: Sink the data to DuckDB
Although the state store is kind of like a database, you can’t query it (yet)—queryable state is on the Quix Streams roadmap however. In the meantime, you can sink the data into a “proper” lightweight database such as DuckDB and query it there. This is a useful exercise because sinking data to an external destination is a common use case.
For example, suppose that you want to build a Streamlit dashboard that charts the most active pages. It would be unwise to connect Streamlit directly to Kafka (using consumer code) because the data updates too frequently. Instead, you can sink the streaming data into a database first and have Streamlit poll the database at regular intervals.
To see how data sinking works, run our example application with the following command: python 3_sink_demo_data.py
You should see some output that resembles the following example:
Again, here are the important excerpts from the code:
After initializing a consumer and a local DuckDB database, Quix Streams runs an INSERT query on every message received. It updates the counts for existing page_ids and adds rows for new page_ids.
After the process runs for about 10 seconds and enough records have been inserted, terminate the process.
Step 5: Query the database
Now that you have the aggregates in a database, you can run a simple “SELECT ALL” query. You don’t have to worry about managing the aggregations as part of a query on the raw data.
Note that a local file-based DuckDB database cannot handle synchronous reads and writes, so make sure that the sink process stops before you query the database.
To see the aggregated page action counts, run the following command: python 4_query_demo_data.py
You should see some output that resembles the following example:
Here’s the full query code.
Next Steps
Well done! You've set up the key components of a Python-based stream processing pipeline using WarpStream as a broker and Quix Streams as a Kafka producer, consumer, and stream processor.
You can then scale your pipeline by running containerized applications in any container orchestration platform from one of the big cloud providers, but this can add a lot of complexity. The simplest way to scale a pipeline is to run your applications in Quix Cloud. The Quix Cloud platform integrates seamlessly with the Quix Streams Python library and includes a lot of extra features that you would not get with a more generic product such as AWS Lambda or Google Cloud Run.
For example, you can get an interactive visualization of your pipeline, which makes it easy to inspect the data flowing through it:
To try Quix Cloud, visit the sign-up page and start a free trial.
Next, check out the WarpStream docs for configuring the WarpStream Agent, or review the Quix docs to learn more about what you can do with the Quix Streams.
Last updated