LogoLogo
WarpStream.comSlackDiscordContact UsCreate Account
  • Overview
    • Introduction
    • Architecture
      • Service Discovery
      • Write Path
      • Read Path
      • Life of a Request (Simplified)
    • Change Log
  • Getting Started
    • Install the WarpStream Agent / CLI
    • Run the Demo
    • "Hello World" for Apache Kafka
  • BYOC
    • Run the Agents Locally
    • Deploy the Agents
      • Object Storage Configuration
      • Kubernetes Known Issues
      • Rolling Restarts and Upgrades
    • Infrastructure as Code
      • Terraform Provider
      • Helm charts
      • Terraform Modules
    • Monitoring
      • Pre-made Datadog Dashboard
      • Pre-made Grafana Dashboard
      • Important Metrics and Logs
      • Recommended List of Alerts
      • Monitoring Consumer Groups
      • Hosted Prometheus Endpoint
    • Client Configuration
      • Tuning for Performance
      • Configure Clients to Eliminate AZ Networking Costs
        • Force Interzone Load Balancing
      • Configuring Kafka Client ID Features
      • Known Issues
    • Authentication
      • SASL Authentication
      • Mutual TLS (mTLS)
      • Basic Authentication
    • Advanced Agent Deployment Options
      • Agent Roles
      • Agent Groups
      • Protect Data in Motion with TLS Encryption
      • Low Latency Clusters
      • Network Architecture Considerations
      • Agent Configuration Reference
      • Reducing Infrastructure Costs
      • Client Configuration Auto-tuning
    • Hosted Metadata Endpoint
    • Managed Data Pipelines
      • Cookbooks
    • Schema Registry
      • WarpStream BYOC Schema Registry
      • Schema Validation
      • WarpStream Schema Linking
    • Port Forwarding (K8s)
    • Orbit
    • Enable SAML Single Sign-on (SSO)
    • Trusted Domains
    • Diagnostics
      • GoMaxProcs
      • Small Files
  • Reference
    • ACLs
    • Billing
      • Direct billing
      • AWS Marketplace
    • Benchmarking
    • Compression
    • Protocol and Feature Support
      • Kafka vs WarpStream Configuration Reference
      • Compacted topics
    • Secrets Overview
    • Security and Privacy Considerations
    • API Reference
      • API Keys
        • Create
        • Delete
        • List
      • Virtual Clusters
        • Create
        • Delete
        • Describe
        • List
        • DescribeConfiguration
        • UpdateConfiguration
      • Virtual Clusters Credentials
        • Create
        • Delete
        • List
      • Monitoring
        • Describe All Consumer Groups
      • Pipelines
        • List Pipelines
        • Create Pipeline
        • Delete Pipeline
        • Describe Pipeline
        • Create Pipeline Configuration
        • Change Pipeline State
      • Invoices
        • Get Pending Invoice
        • Get Past Invoice
    • CLI Reference
      • warpstream agent
      • warpstream demo
      • warpstream cli
      • warpstream cli-beta
        • benchmark-consumer
        • benchmark-producer
        • console-consumer
        • console-producer
        • consumer-group-lag
        • diagnose-record
        • file-reader
        • file-scrubber
      • warpstream playground
    • Integrations
      • Arroyo
      • AWS Lambda Triggers
      • ClickHouse
      • Debezium
      • Decodable
      • DeltaStream
      • docker-compose
      • DuckDB
      • ElastiFlow
      • Estuary
      • Fly.io
      • Imply
      • InfluxDB
      • Kestra
      • Materialize
      • MinIO
      • MirrorMaker
      • MotherDuck
      • Ockam
      • OpenTelemetry Collector
      • ParadeDB
      • Parquet
      • Quix Streams
      • Railway
      • Redpanda Console
      • RisingWave
      • Rockset
      • ShadowTraffic
      • SQLite
      • Streambased
      • Streamlit
      • Timeplus
      • Tinybird
      • Upsolver
    • Partitions Auto-Scaler (beta)
    • Serverless Clusters
Powered by GitBook
On this page
  • Introduction
  • Prerequisites
  • Step 1: Python Code
  • Step 2: Run the app
  • Next Steps

Was this helpful?

  1. Reference
  2. Integrations

Streamlit

Streamlit is a Python library that enables the simple creation of web apps.

PreviousStreambasedNextTimeplus

Last updated 9 months ago

Was this helpful?

A video walkthrough can be found below:

Introduction

Prerequisites

  1. Have the Streamlit Python library installed: pip install streamlit

  2. Have Confluent Kafka Python library installed: pip install confluent_kafka

  3. A Serverless WarpStream cluster is up and running with a populated topic.

Step 1: Python Code

The following Python program can be used as is. It will present a web page that will:

  • Prompt for your cluster credentials

  • Ask for a number of records to display

  • Present a list of available topics

  • Print out the requested number of records from the topic

import streamlit as st
from confluent_kafka import KafkaException, KafkaError, Consumer
from confluent_kafka.admin import AdminClient

# Streamlit UI for user inputs
st.title("WarpStream Topic Consumer")
username = st.text_input("SASL Username", value="")
password = st.text_input("SASL Password", value="", type="password")
bootstrap_servers = st.text_input("Bootstrap Servers:Port", value="")

# Prompt the user for the number of records to read
num_records = st.number_input("Number of records to read", min_value=1, step=1, value=10)

# Submit button to connect to Kafka and fetch topics
connect_button = st.button("Connect to WarpStream")

# Initialize session state for topics and selected topic
if 'available_topics' not in st.session_state:
    st.session_state.available_topics = []
if 'selected_topic' not in st.session_state:
    st.session_state.selected_topic = None

# Function to create Kafka AdminClient to fetch topics
def get_kafka_topics(username, password, bootstrap_servers):
    conf = {
        'sasl.mechanism': 'PLAIN',
        'bootstrap.servers': bootstrap_servers,
        'security.protocol': 'SASL_SSL',
        'sasl.username': username,
        'sasl.password': password,
        'socket.timeout.ms': 10000,   # Increase socket timeout
        'metadata.request.timeout.ms': 15000,  # Increase metadata request timeout
    }
    
    try:
        admin_client = AdminClient(conf)
        topics_metadata = admin_client.list_topics(timeout=15)  # Increase timeout here too
        topics = list(topics_metadata.topics.keys())
        st.success("Successfully fetched topics!")
        return topics
    except KafkaException as e:
        st.error(f"Failed to fetch Kafka topics: {e}")
        return []

# Function to consume a specified number of messages from the selected topic
def consume_messages(username, password, bootstrap_servers, topic, num_records):
    conf = {
        'sasl.mechanism': 'PLAIN',
        'bootstrap.servers': bootstrap_servers,
        'security.protocol': 'SASL_SSL',
        'sasl.username': username,
        'sasl.password': password,
        'group.id': 'streamlit-consumer',
        'auto.offset.reset': 'earliest'
    }
    
    consumer = Consumer(conf)
    consumer.subscribe([topic])
    
    messages = []
    try:
        for _ in range(num_records):
            msg = consumer.poll(timeout=10.0)  # Increase poll timeout to 10 seconds
            if msg is None:
                st.warning("No more messages found after waiting.")
                break
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    st.info("Reached end of partition.")
                    break
                else:
                    st.error(f"Consumer error: {msg.error()}")
                    break
            messages.append(msg.value().decode('utf-8'))
    except Exception as e:
        st.error(f"Error while consuming messages: {e}")
    finally:
        consumer.close()
        
    return messages

# Handle the connect button click
if connect_button:
    if username and password and bootstrap_servers:
        st.session_state.available_topics = get_kafka_topics(username, password, bootstrap_servers)
    else:
        st.error("Please provide all required details.")

# Display the topics as radio buttons if topics are available
if st.session_state.available_topics:
    st.session_state.selected_topic = st.radio("Select a topic", st.session_state.available_topics)
    submit_button = st.button("Submit", disabled=not bool(st.session_state.selected_topic))
    
    # Handle the topic selection submit button
    if submit_button and st.session_state.selected_topic:
        st.write(f"Selected Topic: {st.session_state.selected_topic}")
        
        # Consume the specified number of messages from the selected topic
        messages = consume_messages(username, password, bootstrap_servers, st.session_state.selected_topic, num_records)
        
        if messages:
            st.write(f"First {num_records} messages from the topic:")
            for i, message in enumerate(messages, start=1):
                st.write(f"{i}: {message}")
        else:
            st.write("No messages found or unable to consume messages.")

Step 2: Run the app

From the command line, the program can be run as follows:

streamlit run myapp.py

Next, open a web browser and paste in the following URL:

http://localhost:8501

Which will present you with the following screen:

Enter the applicable information. If Topics are available, they will be displayed, and you can then preview records from them. This app is generic, so any valid set of credentials will connect and allow you to browse the topics and data in your cluster.

Next Steps

Congratulations! You now have a general-purpose Streamlit app that will allow you to quickly peruse the structure and data in your WarpStream cluster.

There is no direct connection to DuckDB from any Apache Kafka-compliant service. However, a DuckDB plug-in named provides this ability. This guide will explain how to connect the two systems together to allow you to perform analytics on your WarpStream-managed Topics.

Have 3.x or later installed

WarpStream account - get access to WarpStream by registering .

Kwack
Python
here