ParadeDB

ParadeDB is an Elasticsearch alternative built on Postgres for real-time search and analytics. It is supported on all officially supported Postgres versions and ships by default with Postgres 16.

A video walkthrough can be found below:

Introduction

WarpStream is Apache Kafka compatible, and ParadeDB is a Postgres extension. There is no direct connection between Kafka and Postgres, but it can be accomplished with a number of third-party tools, some more complex and bloated than others. For this illustration, we will be using the open-source pipeline tool Bento. Bento is written in Go and is simple to install and use as a single binary. The pipeline scripts are written in YAML, which we will cover.

Prerequisites

  1. Have Bento installed (covered below).

  2. Have ParadeDB installed (covered below).

  3. WarpStream account - get access to WarpStream by registering here.

  4. WarpStream credentials.

  5. A Serverless WarpStream cluster is up and running with a populated topic.

Step 1: Software Installation

Bento is the open-source pipeline tool we will use to read from WarpStream and write to Postgres. It can be installed from source, binary, or as a docker container. Visit GitHub for the best instructions for your situation.

ParadeDB can be installed and started with the following Docker command:

docker run --name paradedb -e POSTGRES_PASSWORD=password -p 5432:5432 paradedb/paradedb

Step 2: The Bento Script

This script expects that you have exported your WarpStream credentials as environment variables. WarpStream will provide this command when you create a credential:

export BOOTSTRAP_HOST=<YOUR_BOOTSTRAP_BROKER> \
SASL_USERNAME=<YOUR_SASL_USERNAME> \
SASL_PASSWORD=<YOUR_SASL_PASSWORD>;

The following Bento script will perform the following actions:

  • Configure the connection information to WarpStream.

  • Set the consumer to bento_kafka.

  • Read from the topic products.

  • The driverfield tells Bento that this will be Postgres.

  • The dsn configures the connection information to Postgres. Modify the names and passwords as needed for your environment.

  • The script defines the table layout that we will be getting from the JSON files in the topics, and creates the table if it doesn't exist, then writes the data. This could be further abstracted to dynamically get the field names from the JSON and create them in the table.

  • There is a commented block that will provide debugging information to the terminal.

  • Once run, the script will continue until stopped with ctrl+c.

Note: This script can easily be modified to handle multiple topics; just add them as separate lines after - products and - check for them under cases:

input:
 kafka_franz:
   consumer_group: bento_kafka
   commit_period: 1s
   start_from_oldest: true
   tls:
     enabled: true
   sasl:
     - mechanism: PLAIN
       username: "${SASL_USERNAME}"
       password: "${SASL_PASSWORD}"
   seed_brokers:
     - "${BOOTSTRAP_HOST}"
   topics:
     - products

# uncomment the following to get debug information to the terminal
#logger:
#  level: DEBUG

#pipeline:
#  processors:
#    - log:
#        level: DEBUG
#        message: "Sending message"
#        fields_mapping: root = this

output:
 switch:
   cases:
     - check: '@kafka_topic == "products"'
       output:
         sql_insert:
           driver: postgres
           dsn:  postgres://postgres:password@localhost:5432/postgres?sslmode=disable
           columns:  [ product_id, name, price, inventory_count ]
           args_mapping: root = [ this.productId, this.name, this.price, this.inventoryCount ]
           table: products
           init_statement: | # Create table for product data
             CREATE TABLE IF NOT EXISTS products (
                 product_id UUID,
                 name TEXT,
                 price NUMERIC(10,2),
                 inventory_count INTEGER
             );

Step 3: Run the app

From the command line, Bento can be run as follows:

bento -c myscript.yaml

Once you have data in ParadeDB, You can connect to ParadeDB with the following Docker command:

docker exec -it paradedb psql -U postgres -p 5432

Step 4: Use ParadeDB

We'll include some basic commands here based on the sample data described above. For thorough documentation on ParadeDB, refer to their website.

First, create a BM25 index. Bm25 significantly improves the ranking capabilities of native Postgres FTS, which uses TF-IDF and does not maintain statistics for word frequencies across the entire word corpus.

-- This will create the necessary index based on the Bento-described table schema
CALL paradedb.create_bm25(
    index_name => 'products',
    table_name => 'products',
    key_field => 'product_id',
    text_fields => paradedb.field('name'),
    numeric_fields => paradedb.field('price') || paradedb.field('inventory_count')
);

If this simple query works, then it means the index was created successfully.

-- This should return successfully, indicating that the index was created correctly
SELECT * FROM products.search('name:bottle', limit_rows => 10);

Next Steps

Congratulations! You now know how to use Bento to create powerful pipelines from a WarpStream cluster to ParadeDB and make use of their advanced indexing.

Last updated