LogoLogo
WarpStream.comSlackDiscordContact UsCreate Account
  • Overview
    • Introduction
    • Architecture
      • Service Discovery
      • Write Path
      • Read Path
      • Life of a Request (Simplified)
    • Change Log
  • Getting Started
    • Install the WarpStream Agent / CLI
    • Run the Demo
    • "Hello World" for Apache Kafka
  • BYOC
    • Run the Agents Locally
    • Deploy the Agents
      • Object Storage Configuration
      • Kubernetes Known Issues
      • Rolling Restarts and Upgrades
    • Client Configuration
      • Tuning for Performance
      • Configure Clients to Eliminate AZ Networking Costs
        • Force Interzone Load Balancing
      • Configuring Kafka Client ID Features
      • Known Issues
    • Infrastructure as Code
      • Terraform Provider
      • Helm charts
      • Terraform Modules
      • Protecting important resources from accidental deletion
    • Monitoring
      • Pre-made Datadog Dashboard
      • Pre-made Grafana Dashboard
      • Important Metrics and Logs
      • Recommended List of Alerts
      • Monitoring Consumer Groups
      • Hosted Prometheus Endpoint
    • Authentication
      • SASL Authentication
      • Mutual TLS (mTLS)
      • Basic Authentication
    • Advanced Agent Deployment Options
      • Agent Roles
      • Agent Groups
      • Protect Data in Motion with TLS Encryption
      • Low Latency Clusters
      • Network Architecture Considerations
      • Agent Configuration Reference
      • Reducing Infrastructure Costs
      • Client Configuration Auto-tuning
    • Hosted Metadata Endpoint
    • Managed Data Pipelines
      • Cookbooks
    • Schema Registry
      • WarpStream BYOC Schema Registry
      • Schema Validation
      • WarpStream Schema Linking
    • Orbit
    • Port Forwarding (K8s)
  • Reference
    • ACLs
    • Billing
      • Direct billing
      • AWS Marketplace
    • Benchmarking
    • Compression
    • Protocol and Feature Support
      • Kafka vs WarpStream Configuration Reference
      • Compacted topics
      • Topic Configuration Reference
    • Secrets Overview
    • Security and Privacy Considerations
    • API Reference
      • API Keys
        • Create
        • Delete
        • List
      • Virtual Clusters
        • Create
        • Delete
        • Describe
        • List
        • DescribeConfiguration
        • UpdateConfiguration
      • Virtual Clusters Credentials
        • Create
        • Delete
        • List
      • Monitoring
        • Describe All Consumer Groups
      • Pipelines
        • List Pipelines
        • Create Pipeline
        • Delete Pipeline
        • Describe Pipeline
        • Create Pipeline Configuration
        • Change Pipeline State
      • Invoices
        • Get Pending Invoice
        • Get Past Invoice
    • CLI Reference
      • warpstream agent
      • warpstream demo
      • warpstream cli
      • warpstream cli-beta
        • benchmark-consumer
        • benchmark-producer
        • console-consumer
        • console-producer
        • consumer-group-lag
        • diagnose-record
        • file-reader
        • file-scrubber
      • warpstream playground
    • Integrations
      • Arroyo
      • AWS Lambda Triggers
      • ClickHouse
      • Debezium
      • Decodable
      • DeltaStream
      • docker-compose
      • DuckDB
      • ElastiFlow
      • Estuary
      • Fly.io
      • Imply
      • InfluxDB
      • Kestra
      • Materialize
      • MinIO
      • MirrorMaker
      • MotherDuck
      • Ockam
      • OpenTelemetry Collector
      • ParadeDB
      • Parquet
      • Quix Streams
      • Railway
      • Redpanda Console
      • RisingWave
      • Rockset
      • ShadowTraffic
      • SQLite
      • Streambased
      • Streamlit
      • Timeplus
      • Tinybird
      • Upsolver
    • Partitions Auto-Scaler (beta)
    • Serverless Clusters
    • Enable SAML Single Sign-on (SSO)
    • Trusted Domains
    • Diagnostics
      • GoMaxProcs
      • Small Files
Powered by GitBook
On this page
  • Stream to Parquet Files
  • Stream to GCP BigQuery
  • Stream to GCP BigQuery Iceberg tables
  • Stream to Redshift

Was this helpful?

  1. BYOC
  2. Managed Data Pipelines

Cookbooks

This page contains a collection of WarpStream-specific Bento recipes to help you accomplish common tasks.

PreviousManaged Data PipelinesNextSchema Registry

Last updated 6 months ago

Was this helpful?

You can always for more information about how to use Bento, but this page contains snippets that you can copy-and-paste to accomplish common tasks.

Stream to Parquet Files

input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    aws_s3:
        batching:
            byte_size: 32000000
            count: 0
            period: 5s
            processors:
                - mutation: |
                    root.value = content().string()
                    root.key = @kafka_key
                    root.kafka_topic = @kafka_topic
                    root.kafka_partition = @kafka_partition
                    root.kafka_offset = @kafka_offset
                - parquet_encode:
                    default_compression: zstd
                    default_encoding: PLAIN
                    schema:
                        - name: kafka_topic
                          type: BYTE_ARRAY
                        - name: kafka_partition
                          type: INT64
                        - name: kafka_offset
                          type: INT64
                        - name: key
                          type: BYTE_ARRAY
                        - name: value
                          type: BYTE_ARRAY
        bucket: $YOUR_S3_BUCKET
        path: parquet_logs/${! timestamp_unix() }-${! uuid_v4() }.parquet
        region: $YOUR_S3_REGION

warpstream:
    cluster_concurrency_target: 6

Stream to GCP BigQuery

input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    gcp_bigquery_write_api:
        dataset: $YOUR_BQ_DATASET
        project: $YOUR_GCP_PROJECT
        table: $YOUR_BQ_TABLE
pipeline:
    processors:
        - mapping: |
            root.kafka_topic = @kafka_topic
            root.kafka_partition = @kafka_partition
            root.kafka_offset = @kafka_offset
            root.record = content().string()
warpstream:
    cluster_concurrency_target: 6

Note that GCP BigQuery has strict quotas on how many table modifications can be performed per day, so we recommend inserting data using large batches as shown above.

Stream to GCP BigQuery Iceberg tables

Stream to Redshift

output:
  sql_insert:
    driver: postgres 
    dsn: postgresql://username_from_secret:password_from_secret@"$REDSHIFT_ENDPOINT"/dev
    table: test
    columns: [age, name]
    args_mapping: |
      root = [
        this.age,
        this.name,
      ]
    init_statement: |
      CREATE TABLE test (name varchar(255), age int);
    secret_name: "$REDSHIFT_SECRET_NAME"
    region: eu-west-1
    credentials:
      id: "$AWS_ACCESS_KEY_ID"
      secret: "$AWS_SECRET_ACCESS_KEY"

Follow the same instructions as , except first create an Iceberg table in BigQuery. Note that an Iceberg table needs to be in BigQuery first for this work. This bento component leverages BigQuery's to import data into the IceBerg table and that API does not support creating Iceberg tables on the fly yet.

read the docs
regular GCP BigQuery
created
load jobs
Read more details about connecting to Redshift in the Bento documentation.