Cookbooks
This page contains a collection of WarpStream-specific Bento recipes to help you accomplish common tasks.
You can always read the docs for more information about how to use Bento, but this page contains snippets that you can copy-and-paste to accomplish common tasks.
Stream to Parquet Files
input:
kafka_franz_warpstream:
topics:
- logs
output:
aws_s3:
batching:
byte_size: 32000000
count: 0
period: 5s
processors:
- mutation: |
root.value = content().string()
root.key = @kafka_key
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
- parquet_encode:
default_compression: zstd
default_encoding: PLAIN
schema:
- name: kafka_topic
type: BYTE_ARRAY
- name: kafka_partition
type: INT64
- name: kafka_offset
type: INT64
- name: key
type: BYTE_ARRAY
- name: value
type: BYTE_ARRAY
bucket: $YOUR_S3_BUCKET
path: parquet_logs/${! timestamp_unix() }-${! uuid_v4() }.parquet
region: $YOUR_S3_REGION
warpstream:
cluster_concurrency_target: 6
Stream to GCP BigQuery
input:
kafka_franz_warpstream:
topics:
- logs
output:
gcp_bigquery_write_api:
dataset: $YOUR_BQ_DATASET
project: $YOUR_GCP_PROJECT
table: $YOUR_BQ_TABLE
pipeline:
processors:
- mapping: |
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
root.record = content().string()
warpstream:
cluster_concurrency_target: 6
Note that GCP BigQuery has strict quotas on how many table modifications can be performed per day, so we recommend inserting data using large batches as shown above.
Stream to GCP BigQuery Iceberg tables
Follow the same instructions as regular GCP BigQuery, except first create an Iceberg table in BigQuery. Note that an Iceberg table needs to be created in BigQuery first for this work. This bento component leverages BigQuery's load jobs to import data into the IceBerg table and that API does not support creating Iceberg tables on the fly yet.
Stream to Redshift
output:
sql_insert:
driver: postgres
dsn: postgresql://username_from_secret:password_from_secret@"$REDSHIFT_ENDPOINT"/dev
table: test
columns: [age, name]
args_mapping: |
root = [
this.age,
this.name,
]
init_statement: |
CREATE TABLE test (name varchar(255), age int);
secret_name: "$REDSHIFT_SECRET_NAME"
region: eu-west-1
credentials:
id: "$AWS_ACCESS_KEY_ID"
secret: "$AWS_SECRET_ACCESS_KEY"
Read more details about connecting to Redshift in the Bento documentation.
Last updated
Was this helpful?