# Cookbooks

You can always [read the docs](https://warpstreamlabs.github.io/bento/docs/about) for more information about how to use Bento, but this page contains snippets that you can copy-and-paste to accomplish common tasks.

## Stream to Parquet Files

```yaml
input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    aws_s3:
        batching:
            byte_size: 32000000
            count: 0
            period: 5s
            processors:
                - mutation: |
                    root.value = content().string()
                    root.key = @kafka_key
                    root.kafka_topic = @kafka_topic
                    root.kafka_partition = @kafka_partition
                    root.kafka_offset = @kafka_offset
                - parquet_encode:
                    default_compression: zstd
                    default_encoding: PLAIN
                    schema:
                        - name: kafka_topic
                          type: BYTE_ARRAY
                        - name: kafka_partition
                          type: INT64
                        - name: kafka_offset
                          type: INT64
                        - name: key
                          type: BYTE_ARRAY
                        - name: value
                          type: BYTE_ARRAY
        bucket: $YOUR_S3_BUCKET
        path: parquet_logs/${! timestamp_unix() }-${! uuid_v4() }.parquet
        region: $YOUR_S3_REGION

warpstream:
    cluster_concurrency_target: 6
```

## Stream to GCP BigQuery

```yaml
input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    gcp_bigquery_write_api:
        dataset: $YOUR_BQ_DATASET
        project: $YOUR_GCP_PROJECT
        table: $YOUR_BQ_TABLE
pipeline:
    processors:
        - mapping: |
            root.kafka_topic = @kafka_topic
            root.kafka_partition = @kafka_partition
            root.kafka_offset = @kafka_offset
            root.record = content().string()
warpstream:
    cluster_concurrency_target: 6
```

Note that GCP BigQuery has strict quotas on how many table modifications can be performed per day, so we recommend inserting data using large batches as shown above.

## Stream to GCP BigQuery Iceberg tables

Follow the same instructions as [regular GCP BigQuery](#write-data-ingested-into-a-warpstream-kafka-topic-into-gcp-bigquery), except first create an Iceberg table in BigQuery.\
Note that an Iceberg table needs to be [created](https://cloud.google.com/bigquery/docs/iceberg-tables#create-iceberg-tables) in BigQuery first for this work. This bento component leverages BigQuery's [load jobs](https://cloud.google.com/bigquery/docs/iceberg-tables#sql) to import data into the IceBerg table and that API does not support creating Iceberg tables on the fly yet.

## Stream to Redshift

```yaml
output:
  sql_insert:
    driver: postgres 
    dsn: postgresql://username_from_secret:password_from_secret@"$REDSHIFT_ENDPOINT"/dev
    table: test
    columns: [age, name]
    args_mapping: |
      root = [
        this.age,
        this.name,
      ]
    init_statement: |
      CREATE TABLE test (name varchar(255), age int);
    secret_name: "$REDSHIFT_SECRET_NAME"
    region: eu-west-1
    credentials:
      id: "$AWS_ACCESS_KEY_ID"
      secret: "$AWS_SECRET_ACCESS_KEY"
```

[Read more details about connecting to Redshift in the Bento documentation.](/warpstream/overview/architecture.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.warpstream.com/warpstream/kafka/manage-connectors/bento/cookbooks.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
