Quix Streams
This page describes how to use the Quix Streams Python library to read and aggregate data from WarpStream and ingest the aggregations into a local DuckDB database for offline querying.
Last updated
Was this helpful?
This page describes how to use the Quix Streams Python library to read and aggregate data from WarpStream and ingest the aggregations into a local DuckDB database for offline querying.
Last updated
Was this helpful?
Quix Streams is a cloud-native library for processing data in Kafka using pure Python. It’s designed to give you the power of a distributed system in a lightweight library by combining Kafka's low-level scalability and resiliency features with an easy-to-use Python interface.
A WarpStream account - get access to WarpStream by registering on the .
A WarpStream cluster is up and running.
Installations of Git and Python with the required Python modules:
Quix Streams
DuckDB
python-dotenv
We recommend setting up a Python virtual environment before you install the required python Python modules.
You can install the dependencies with the following command:
The Quix team has prepared a demo repo to help you get started with Quix Streams and WarpStream.
Open a terminal window and clone the following repo.
The repo contains the following Python files representing some basic data pipeline steps.
1_produce_demo_data.py
2_process_demo_data.py
3_sink_demo_data.py
4_query_demo_data.py
Create a .env
file to store the required environment variables. You can use the .env_example
file in the repo as a reference or run the following command in a bash shell:
Use a Quix Streams producer application to continuously produce some demo data to a topic. When you run the application for the first time, Quix Streams ensures that the required topics are automatically created if they don’t exist already.
Open a terminal window inside the repo directory and run the following command:
python 1_process_demo_data.py
You should see some output that resembles the following example:
Here’s an important excerpt from the code you just ran:
The previous code initializes a connection to a serverless WarpStream cluster using the credentials defined in your .env
file. Once a connection has been established, it continuously generates some synthetic page view data and produces it to a Kafka topic (the name of this topic is defined in the "raw_data_topic" environment variable).
Keep the producer terminal window open, and open a second terminal window.
As you can see in the producer output, the synthetic data represents a very basic user activity log for a website. Suppose that you want to see the most active pages. To do so, you could continuously count the number of user actions (regardless of type) and aggregate the counts by page.
You can achieve this with a simple Quix Streams processor application.
To see it in action, run the following command in a second terminal window python 2_process_demo_data.py
You should see some output that resembles the following example:
Here are the important excerpts from the code:
The line 'sdf = sdf.group_by("page_id")
’—This line rekeys the stream so that the messages are keyed by the page_id. The count_messages
function then keeps a running total of the action counts.
Under the hood, it maintains an entry for each message key (in this case, page_id) in the state and updates the relevant action count whenever a new message is processed. This means that if the process is interrupted or restarted, it can pick up from where it left off. In essence, the state store serves a similar role to a lightweight database.
This repo includes a sample file state_example.json
to give you an idea of what the state would look like for this example.
The aggregated results are then written to a second downstream topic using the to_topic
function.
Keep the processor terminal window open, and open a third window.
Although the state store is kind of like a database, you can’t query it (yet)—queryable state is on the Quix Streams roadmap however. In the meantime, you can sink the data into a “proper” lightweight database such as DuckDB and query it there. This is a useful exercise because sinking data to an external destination is a common use case.
For example, suppose that you want to build a Streamlit dashboard that charts the most active pages. It would be unwise to connect Streamlit directly to Kafka (using consumer code) because the data updates too frequently. Instead, you can sink the streaming data into a database first and have Streamlit poll the database at regular intervals.
To see how data sinking works, run our example application with the following command: python 3_sink_demo_data.py
You should see some output that resembles the following example:
Again, here are the important excerpts from the code:
After initializing a consumer and a local DuckDB database, Quix Streams runs an INSERT query on every message received. It updates the counts for existing page_ids and adds rows for new page_ids.
After the process runs for about 10 seconds and enough records have been inserted, terminate the process.
Now that you have the aggregates in a database, you can run a simple “SELECT ALL” query. You don’t have to worry about managing the aggregations as part of a query on the raw data.
Note that a local file-based DuckDB database cannot handle synchronous reads and writes, so make sure that the sink process stops before you query the database.
To see the aggregated page action counts, run the following command: python 4_query_demo_data.py
You should see some output that resembles the following example:
Here’s the full query code.
Well done! You've set up the key components of a Python-based stream processing pipeline using WarpStream as a broker and Quix Streams as a Kafka producer, consumer, and stream processor.
For example, you can get an interactive visualization of your pipeline, which makes it easy to inspect the data flowing through it:
You can then scale your pipeline by running containerized applications in any container orchestration platform from one of the big cloud providers, but this can add a lot of complexity. The simplest way to scale a pipeline is to run your applications in . The Quix Cloud platform integrates seamlessly with the Quix Streams Python library and includes a lot of extra features that you would not get with a more generic product such as AWS Lambda or Google Cloud Run.
To try Quix Cloud, visit the and start a free trial.
Next, check out the WarpStream docs for configuring the , or review the to learn more about what you can do with the Quix Streams.