# Cookbooks

You can always [read the docs](https://warpstreamlabs.github.io/bento/docs/about) for more information about how to use Bento, but this page contains snippets that you can copy-and-paste to accomplish common tasks.

## Stream to Parquet Files

```yaml
input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    aws_s3:
        batching:
            byte_size: 32000000
            count: 0
            period: 5s
            processors:
                - mutation: |
                    root.value = content().string()
                    root.key = @kafka_key
                    root.kafka_topic = @kafka_topic
                    root.kafka_partition = @kafka_partition
                    root.kafka_offset = @kafka_offset
                - parquet_encode:
                    default_compression: zstd
                    default_encoding: PLAIN
                    schema:
                        - name: kafka_topic
                          type: BYTE_ARRAY
                        - name: kafka_partition
                          type: INT64
                        - name: kafka_offset
                          type: INT64
                        - name: key
                          type: BYTE_ARRAY
                        - name: value
                          type: BYTE_ARRAY
        bucket: $YOUR_S3_BUCKET
        path: parquet_logs/${! timestamp_unix() }-${! uuid_v4() }.parquet
        region: $YOUR_S3_REGION

warpstream:
    cluster_concurrency_target: 6
```

## Stream to GCP BigQuery

```yaml
input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    gcp_bigquery_write_api:
        dataset: $YOUR_BQ_DATASET
        project: $YOUR_GCP_PROJECT
        table: $YOUR_BQ_TABLE
pipeline:
    processors:
        - mapping: |
            root.kafka_topic = @kafka_topic
            root.kafka_partition = @kafka_partition
            root.kafka_offset = @kafka_offset
            root.record = content().string()
warpstream:
    cluster_concurrency_target: 6
```

Note that GCP BigQuery has strict quotas on how many table modifications can be performed per day, so we recommend inserting data using large batches as shown above.

## Stream to GCP BigQuery Iceberg tables

Follow the same instructions as [regular GCP BigQuery](#write-data-ingested-into-a-warpstream-kafka-topic-into-gcp-bigquery), except first create an Iceberg table in BigQuery.\
Note that an Iceberg table needs to be [created](https://cloud.google.com/bigquery/docs/iceberg-tables#create-iceberg-tables) in BigQuery first for this work. This bento component leverages BigQuery's [load jobs](https://cloud.google.com/bigquery/docs/iceberg-tables#sql) to import data into the IceBerg table and that API does not support creating Iceberg tables on the fly yet.

## Stream to Redshift

```yaml
output:
  sql_insert:
    driver: postgres 
    dsn: postgresql://username_from_secret:password_from_secret@"$REDSHIFT_ENDPOINT"/dev
    table: test
    columns: [age, name]
    args_mapping: |
      root = [
        this.age,
        this.name,
      ]
    init_statement: |
      CREATE TABLE test (name varchar(255), age int);
    secret_name: "$REDSHIFT_SECRET_NAME"
    region: eu-west-1
    credentials:
      id: "$AWS_ACCESS_KEY_ID"
      secret: "$AWS_SECRET_ACCESS_KEY"
```

[Read more details about connecting to Redshift in the Bento documentation.](https://docs.warpstream.com/warpstream/overview/architecture)
