Cookbooks

This page contains a collection of WarpStream-specific Bento recipes to help you accomplish common tasks.

You can always read the docs for more information about how to use Bento, but this page contains snippets that you can copy-and-paste to accomplish common tasks.

Stream to Parquet Files

input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    aws_s3:
        batching:
            byte_size: 32000000
            count: 0
            period: 5s
            processors:
                - mutation: |
                    root.value = content().string()
                    root.key = @kafka_key
                    root.kafka_topic = @kafka_topic
                    root.kafka_partition = @kafka_partition
                    root.kafka_offset = @kafka_offset
                - parquet_encode:
                    default_compression: zstd
                    default_encoding: PLAIN
                    schema:
                        - name: kafka_topic
                          type: BYTE_ARRAY
                        - name: kafka_partition
                          type: INT64
                        - name: kafka_offset
                          type: INT64
                        - name: key
                          type: BYTE_ARRAY
                        - name: value
                          type: BYTE_ARRAY
        bucket: $YOUR_S3_BUCKET
        path: parquet_logs/${! timestamp_unix() }-${! uuid_v4() }.parquet
        region: $YOUR_S3_REGION

warpstream:
    cluster_concurrency_target: 6

Stream to GCP BigQuery

input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    gcp_bigquery_write_api:
        dataset: $YOUR_BQ_DATASET
        project: $YOUR_GCP_PROJECT
        table: $YOUR_BQ_TABLE
pipeline:
    processors:
        - mapping: |
            root.kafka_topic = @kafka_topic
            root.kafka_partition = @kafka_partition
            root.kafka_offset = @kafka_offset
            root.record = content().string()
warpstream:
    cluster_concurrency_target: 6

Note that GCP BigQuery has strict quotas on how many table modifications can be performed per day, so we recommend inserting data using large batches as shown above.

Stream to GCP BigQuery Iceberg tables

Follow the same instructions as regular GCP BigQuery, except first create an Iceberg table in BigQuery. Note that an Iceberg table needs to be created in BigQuery first for this work. This bento component leverages BigQuery's load jobs to import data into the IceBerg table and that API does not support creating Iceberg tables on the fly yet.

Stream to Redshift

output:
  sql_insert:
    driver: postgres 
    dsn: postgresql://username_from_secret:password_from_secret@"$REDSHIFT_ENDPOINT"/dev
    table: test
    columns: [age, name]
    args_mapping: |
      root = [
        this.age,
        this.name,
      ]
    init_statement: |
      CREATE TABLE test (name varchar(255), age int);
    secret_name: "$REDSHIFT_SECRET_NAME"
    region: eu-west-1
    credentials:
      id: "$AWS_ACCESS_KEY_ID"
      secret: "$AWS_SECRET_ACCESS_KEY"

Read more details about connecting to Redshift in the Bento documentation.

Last updated