Setup

This page describes how to setup WarpStream Tableflow.

Introduction

Tableflow is currently available as an Early Access (EA) feature. Please contact us if you'd like to register to be included in the Early Access program.

Tableflow automates the tedious process of transforming a topic in an Apache Kafka-compatible data streaming system into an Apache Iceberg table. Instead of writing custom code and manually configuring a data pipeline for each table you want to build, Tableflow allows you to declaratively specify which topics to build tables from and what schema and data format to expect. When schemas inevitably need to change, you can update the schema in Tableflow's editor and WarpStream will handle the schema migration automatically.

Compaction and table maintenance is included out-of-the-box with no tuning required. Tableflow continuously compacts the table in the background with intelligent heuristics to ensure readers get the best performance.

Tableflow is available as Bring-Your-Own-Cloud (BYOC) where the compute and storage live inside your cloud account inside your VPC. The raw data for your table is only ever stored inside your object storage bucket and never leaves your VPC during the table ingestion and maintenance process. Tableflow maintains a metadata store inside WarpStream Cloud as the Iceberg metadata layer that is periodically synced into your object storage bucket.

Getting Started

To get started with Tableflow, you first need to create a Tableflow cluster from the WarpStream Console, or using one of infrastructure-as-code deployment options. The WarpStream Agents that join this cluster will only perform Tableflow operations and do not expose the Apache Kafka protocol. Please refer to our other documentation for how to install and configure the WarpStream Agents in your environment as the process does not differ for Tableflow.

Once the Agents are running, you can open the Configuration table and start defining your source clusters, topics, tables, and schemas. During the Early Access period, the only method for defining schemas is inline mode, which doesn't require using an external Schema Registry. Schema Registry support will be available in a future release.

Managed Tables

Tableflow tables are fully managed by WarpStream, or what we call "managed tables". You cannot use another system for performing writes, compactions, or other table maintenance operations. This is in contrast to a connector-based approach where you would be forced to combine multiple distinct systems or operations together to implement all of these functions.

Configuration

Tableflow is configured with is fully controllable from a single YAML file which can be edited through the WarpStream console or the Pipelines API.

Overview

source_clusters:
  - name: tableflow_cluster_1
    bootstrap_brokers:
      - hostname: localhost
        port: 9092
  - name: tableflow_cluster_2
    bootstrap_brokers:
      - hostname: broker1.kafkaserver.com
        port: 9092
      - hostname: broker2.kafkaserver.com
        port: 9092
    credentials:
      sasl_username_env: SASL_USERNAME_ENV_VAR
      sasl_password_env: SASL_PASSWORD_ENV_VAR
      use_tls: true
      sasl_mechanism: plain
tables:
    - source_cluster_name: tableflow_cluster_1
      source_topic: example_json_logs_topic
      source_format: json
      schema_mode: inline
      schema:
        fields:
          - { name: environment, type: string, id: 1}
          - { name: service, type: string, id: 2}
          - { name: status, type: string, id: 3}
          - { name: message, type: string, id: 4}
    - source_cluster_name: tableflow_cluster_2          
      source_topic: example_avro_events_topic
      source_format: avro
      schema_mode: inline
      schema:
        fields:
            - { name: event_id, id: 1, type: string }
            - { name: user_id, id: 2, type: long }
            - { name: session_id, id: 3, type: string }
destination_bucket_url: s3://my-bucket-name

The YAML specifies

  • The source clusters Tableflow should connect to.

  • For each cluster, the topic that Tableflow should create Iceberg tables from.

  • For each topic, the schema to deserialize the Kafka records with. The same schema is used to create the Iceberg table.

  • The destination bucket to store the table.

Configure Source Clusters

Source clusters are the Apache Kafka-compatible systems like WarpStream that store the topics you'd like to convert to tables. You define clusters by giving them a name, a list of brokers, and credentials if they are needed. You define source clusters at the root of the configuration YAML.

source_clusters:
  - name: tableflow_cluster
    bootstrap_brokers:
      - hostname: broker.kafkaserver.com
        port: 9092

You can define multiple source clusters so a single Tableflow cluster can centralize data from multiple clusters into one unified place.

Configure Connection and Credentials

If credentials are needed to connect to the Kafka cluster, the connection information can be provided under the credentials block for each cluster.

TLS

source_clusters:
  - name: tableflow_cluster 
    ...
    credentials:
      use_tls: true
      tls_insecure_skip_verify: false
  • use_tls specifies whether the Agents should use TLS when connecting to your source clusters.

  • tls_insecure_skip_verify specifies whether a client verifies the server's certificate chain and host name.

SASL

source_clusters:
  - name: tableflow_cluster 
    ...
    credentials:
      sasl_username_env: SASL_USERNAME_ENV_VAR
      sasl_password_env: SASL_PASSWORD_ENV_VAR
      sasl_mechanism: plain
      use_tls: true
  • Both the sasl_username_env and the sasl_password_env fields refer to environment variable names. The Agents will append a TABLEFLOW_ prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as TABLEFLOW_SASL_USERNAME_ENV_VAR and TABLEFLOW_SASL_PASSWORD_ENV_VAR respectively.

  • The default value of sasl_mechanism is plain. Supported mechanisms include: plain, scram-256, and scram-512.

mTLS PEM encoded certs

source_clusters:
  - name: tableflow_cluster 
    ...
    credentials:
      mtls_client_cert_env: MTLS_CERT_PATH_ENV_VAR
      mtls_client_key_env: MTLS_KEY_PATH_ENV_VAR
      mtls_server_ca_cert_env: MTLS_SERVER_CA_CERT_PATH_ENV_VAR
      use_tls: true
  • The mtls_client_cert_env, mtls_client_key_env, and mtls_server_ca_cert_env fields refer to environment variable names. The Agents will append a TABLEFLOW_ prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as MTLS_CERT_PATH_ENV_VAR, MTLS_KEY_PATH_ENV_VAR, and MTLS_SERVER_CA_CERT_PATH_ENV_VAR respectively.

  • mtls_client_cert_env specifies the environment variable that contains the path to the X.509 certificate file in PEM format.

  • mtls_client_key_env specifies the environment variable that contains the path to the X.509 private key file in PEM format.

  • mtls_server_ca_cert_env is optional and specifies the environment variable that contains the path to the X.509 certificate file in PEM format for the client certificate authority. "If not specified, the host's root certificate pool will be used for client certificate verification.

Configure the Destination Bucket URL

To specify where your table data should be stored, use the destination_bucket_url field at the root of the configuration YAML. The Agent deployment needs to have the appropriate IAM permissions to perform read, write, list, and delete operations against this bucket.

destination_bucket_url: s3://bucket-name?region=us-east-1

The format should look like bucket-url?region=<bucket-region>&prefix=<custom-prefix>. Both the region and the prefix query paramters are optional. The region query parameter is needed for Agent deployments in AWS but the AWS_REGION environment variable is not set. prefix can be used to customize the prefix under which the table will be stored. Tables will be created under the <bucket-name>/<prefix>/warpstream/_tableflow/ path.

Configure Topics and Tables

The next step is defining your tables and topics. Each table you define has exactly one source topic, and the table will be named the same as <cluster-name>+<topic+name>. During the Early Access period, only the inline schema mode is available, which requires defining the schema inline inside the configuration YAML.

Your topics data can be formatted as either JSON or Avro. In the case of Avro, we will remove the Schema Registry magic byte prefix and not rely on the schema ID when running in inline mode. Additionally, for Avro the provided schema needs to match the schema the records were written with exactly for decodiing to work.

Tableflow only supports append-only tables in the Early Access period. If you ingest data from a compacted topic in the source cluster, rows will not be deduplicated and any tombstones may not comply with the schema.

tables:
    - source_cluster_name: tableflow_cluster_1
      source_topic: example_json_logs_topic
      source_format: json
      schema_mode: inline
      schema:
        fields:
          - { name: environment, type: string, id: 1}
          - { name: service, type: string, id: 2}
          - { name: status, type: string, id: 3}
          - { name: message, type: string, id: 4}
    - source_cluster_name: tableflow_cluster_2       
      source_topic: example_avro_events_topic
      source_format: avro
      schema_mode: inline
      schema:
        fields:
            - { name: event_id, id: 1, type: string }
            - { name: user_id, id: 2, type: long }
            - { name: session_id, id: 3, type: string }
            - name: profile
              id: 4
              type: struct
              fields:
                - { name: country, id: 5, type: string }
                - { name: language, id: 6, type: string }
            - name: device
              id: 7
              type: struct
              fields:
                - { name: type, id: 8, type: string }
                - name: os
                  id: 9
                  type: struct
                  fields:
                    - { name: name, id: 10, type: string }
                    - { name: version, id: 11, type: string }
                    - { name: vendor, id: 25, type: string, optional: true }
                - name: browser
                  id: 12
                  type: struct
                  fields:
                    - { name: name, id: 13, type: string }
                    - { name: version, id: 14, type: string }
                - name: screen
                  id: 15
                  type: struct
                  fields:
                    - { name: width, id: 16, type: int }
                    - { name: height, id: 17, type: int }
                    - { name: pixel_ratio, id: 18, type: double }
                - { name: model, id: 19, type: string, optional: true }
            - name: cookies
              id: 20
              type: list
              fields:
                - { name: element, id: 21, type: string }
            - name: event_attributes
              id: 22
              type: map
              fields:
                - { name: key, id: 23, type: string }
                - { name: value, id: 24, type: string }

Schema Definitions

As shown in the above example, schemas specified with the inline mode mirror the Iceberg schema definition encoded as YAML. The id field will be mapped to the field ID for your Iceberg table, and field IDs must be unique.

Partitioning

Tableflow supports unpartitioned and hour partitioned tables using the record timestamp during the Early Access period. In the future, arbitrary custom partitioning will be supported. Additionally, migrating between partitioning schemes is currently not supported.

The partitioning scheme is specified on a per-table basis. The default is an unpartitioned table. Supported values for partitioning_scheme include unpartitioned and hour.

tables:
    - source_topic: example_json_logs_topic
      ...
      partitioning_scheme: hour
      schema:
      ...

Schema Migrations

Schema migrations in Tableflow are supported for adding columns, changing fields from required to optional, and widening integer and floating point types. To perform any of these operations, update the schema to reflect the change in the Configuration editor and deploy it. In the next few syncs of the table metadata into your bucket, the schema change will be reflected.

Dropping and re-ordering columns is not supported during the EA period. Destructive schema migrations such as changing types in a non-widening way are not supported.

Tableflow UI

The Tableflow UI available in the WarpStream console allows editing the Configuration.

Programmatic Access: HTTP API and Terraform

The HTTP API and Terraform providers do not yet support Tableflow. Support will be added for these at some point during the Early Access period.

Data Retention

During the Early Access period, data is retained in the tables indefinitely. Specifying a retention period will be allowed in an upcoming release.

Table Deletion

Tableflow does not delete any tables from the object storage bucket when they are removed from the Configuration in order to prevent accidental data deletion. To delete a table, first delete it from the Configuration and then use your cloud provider's UI or CLI to delete the directory containing your table from within the warpstream/_tableflow directory.

Observability

Ingestion lag for tables is emitted as a metric from the WarpStream Tableflow Agents. This metric is also available visually within the WarpStream Console. This metric is called warpstream_tableflow_partition_time_lag_seconds. Offset lag will be available at a later date during the EA period.

This metric is tagged by table.id, table.name, topic, and partition. partition-level tagging can be disabled with the WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS=partition environment variable.

Last updated

Was this helpful?