There is no direct connection to DuckDB from any Apache Kafka-compliant service. However, a DuckDB plug-in named Kwack provides this ability. This guide will explain how to connect the two systems together to allow you to perform analytics on your WarpStream-managed Topics.
Have the Streamlit Python library installed: pip install streamlit
Have Confluent Kafka Python library installed: pip install confluent_kafka
WarpStream account - get access to WarpStream by registering here.
A Serverless WarpStream cluster is up and running with a populated topic.
Step 1: Python Code
The following Python program can be used as is. It will present a web page that will:
Prompt for your cluster credentials
Ask for a number of records to display
Present a list of available topics
Print out the requested number of records from the topic
import streamlit as st
from confluent_kafka import KafkaException, KafkaError, Consumer
from confluent_kafka.admin import AdminClient
# Streamlit UI for user inputs
st.title("WarpStream Topic Consumer")
username = st.text_input("SASL Username", value="")
password = st.text_input("SASL Password", value="", type="password")
bootstrap_servers = st.text_input("Bootstrap Servers:Port", value="")
# Prompt the user for the number of records to read
num_records = st.number_input("Number of records to read", min_value=1, step=1, value=10)
# Submit button to connect to Kafka and fetch topics
connect_button = st.button("Connect to WarpStream")
# Initialize session state for topics and selected topic
if 'available_topics' not in st.session_state:
st.session_state.available_topics = []
if 'selected_topic' not in st.session_state:
st.session_state.selected_topic = None
# Function to create Kafka AdminClient to fetch topics
def get_kafka_topics(username, password, bootstrap_servers):
conf = {
'sasl.mechanism': 'PLAIN',
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.username': username,
'sasl.password': password,
'socket.timeout.ms': 10000, # Increase socket timeout
'metadata.request.timeout.ms': 15000, # Increase metadata request timeout
}
try:
admin_client = AdminClient(conf)
topics_metadata = admin_client.list_topics(timeout=15) # Increase timeout here too
topics = list(topics_metadata.topics.keys())
st.success("Successfully fetched topics!")
return topics
except KafkaException as e:
st.error(f"Failed to fetch Kafka topics: {e}")
return []
# Function to consume a specified number of messages from the selected topic
def consume_messages(username, password, bootstrap_servers, topic, num_records):
conf = {
'sasl.mechanism': 'PLAIN',
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.username': username,
'sasl.password': password,
'group.id': 'streamlit-consumer',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe([topic])
messages = []
try:
for _ in range(num_records):
msg = consumer.poll(timeout=10.0) # Increase poll timeout to 10 seconds
if msg is None:
st.warning("No more messages found after waiting.")
break
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
st.info("Reached end of partition.")
break
else:
st.error(f"Consumer error: {msg.error()}")
break
messages.append(msg.value().decode('utf-8'))
except Exception as e:
st.error(f"Error while consuming messages: {e}")
finally:
consumer.close()
return messages
# Handle the connect button click
if connect_button:
if username and password and bootstrap_servers:
st.session_state.available_topics = get_kafka_topics(username, password, bootstrap_servers)
else:
st.error("Please provide all required details.")
# Display the topics as radio buttons if topics are available
if st.session_state.available_topics:
st.session_state.selected_topic = st.radio("Select a topic", st.session_state.available_topics)
submit_button = st.button("Submit", disabled=not bool(st.session_state.selected_topic))
# Handle the topic selection submit button
if submit_button and st.session_state.selected_topic:
st.write(f"Selected Topic: {st.session_state.selected_topic}")
# Consume the specified number of messages from the selected topic
messages = consume_messages(username, password, bootstrap_servers, st.session_state.selected_topic, num_records)
if messages:
st.write(f"First {num_records} messages from the topic:")
for i, message in enumerate(messages, start=1):
st.write(f"{i}: {message}")
else:
st.write("No messages found or unable to consume messages.")
Step 2: Run the app
From the command line, the program can be run as follows:
streamlit run myapp.py
Next, open a web browser and paste in the following URL:
http://localhost:8501
Which will present you with the following screen:
Enter the applicable information. If Topics are available, they will be displayed, and you can then preview records from them. This app is generic, so any valid set of credentials will connect and allow you to browse the topics and data in your cluster.
Next Steps
Congratulations! You now have a general-purpose Streamlit app that will allow you to quickly peruse the structure and data in your WarpStream cluster.