SQLite

SQLite is an open-source, embedded, serverless RDBMS that is popular for its small size and ease of use.

A video walkthrough can be found below:

Introduction

WarpStream is Apache Kafka compatible, and SQLite is a small RDBMS that can be embedded almost anywhere. While there is no direct connection between Kafka and SQLite, the task can be accomplished with several third-party tools, some more complex and bloated than others. For this illustration, we will be using the open-source pipeline tool Bento. Bento is written in Go and is simple to install and use as a single binary. The pipeline scripts are written in YAML, which we will cover.

Prerequisites

  1. Have Bento installed (covered below).

  2. Have SQLite installed.

  3. WarpStream account - get access to WarpStream by registering here.

  4. WarpStream credentials.

  5. A WarpStream cluster is up and running with a populated topic.

Step 1: Software Installation

Bento is the open-source pipeline tool we will use to read from WarpStream and write to SQLite. It can be installed from source, binary, or as a docker container. Visit GitHub for the best instructions for your situation.

SQLite can be installed from here. It can then be run to access or create a database as follows:

sqlite3 <YOUR_DATABASE>

Step 2: The Bento Script

This script expects that you have exported your WarpStream credentials as environment variables. WarpStream will provide this command when you create a credential:

export BOOTSTRAP_HOST=<YOUR_BOOTSTRAP_BROKER> \
SASL_USERNAME=<YOUR_SASL_USERNAME> \
SASL_PASSWORD=<YOUR_SASL_PASSWORD>;

The following Bento script will perform the following actions:

  • Configure the connection information to WarpStream.

  • Set the consumer to bento_sqlite.

  • Read from the topic products.

  • The driverfield tells Bento that this will be SQLite.

  • The dsn configures the connection information to SQLite. Modify the path needed for your environment.

  • The script defines the table layout we will get from the JSON files in the topics, creates the table if it doesn't exist, and then writes the data. This could be further abstracted to dynamically get the field names from the JSON and create them in the table.

  • The batching segment gives you granular control over how quickly you write to SQLite to avoid overwhelming it. This example waits for 5000 records or 5 seconds, whichever comes first. Learn more here.

  • Once run, the script will continue until stopped with ctrl+c.

input:
 kafka_franz:
   consumer_group: bento_sqlite
   commit_period: 1s
   start_from_oldest: true
   tls:
     enabled: true
   sasl:
     - mechanism: PLAIN
       username: "${SASL_USERNAME}"
       password: "${SASL_PASSWORD}"
   seed_brokers:
     - <YOUR_WARPSTREAM_BOOTSTRAP_URL>:9092
   topics:
     - products
   batching:
     count: 5000
     period: 5s

output:
   sql_insert:
     driver: sqlite
     dsn:  file:/<YOUR_PATH>/<YOUR_DATABASE>.db
     columns:  [ product_id, name, price, inventory_count ]
     args_mapping: root = [ this.productId, this.name, this.price, this.inventoryCount ]
     table: products
     init_statement: | # Create table for product data
       CREATE TABLE IF NOT EXISTS products (
           product_id UUID,
           name TEXT,
           price NUMERIC(10,2),
           inventory_count INTEGER
       );

Step 3: Run the app

From the command line, Bento can be run as follows:

bento -c myscript.yaml

Once you have data in SQLite, you can perform any SQLite functions you wish on the data. Remember that data will continue to be written from WarpStream until you stop the Bento script.

Next Steps

Congratulations! You now know how to use Bento to create powerful pipelines from a WarpStream cluster to SQLite.

Last updated