Tableflow
Introduction
Tableflow automates the tedious process of transforming a topic in an Apache Kafka-compatible data streaming system into an Apache Iceberg table. Instead of writing custom code and manually configuring a data pipeline for each table you want to build, Tableflow allows you to declaratively specify which topics to build tables from and what schema and data format to expect. When schemas inevitably need to change, you can update the schema in Tableflow's editor and WarpStream will handle the schema migration automatically.
Compaction and table maintenance is included out-of-the-box with no tuning required. Tableflow continuously compacts the table in the background with intelligent heuristics to ensure readers get the best performance.
Tableflow is available as Bring-Your-Own-Cloud (BYOC) where the compute and storage live inside your cloud account inside your VPC. The raw data for your table is only ever stored inside your object storage bucket and never leaves your VPC during the table ingestion and maintenance process. Tableflow maintains a metadata store inside WarpStream Cloud as the Iceberg metadata layer that is periodically synced into your object storage bucket.
Getting Started
To get started with Tableflow, you first need to create a Tableflow cluster from the WarpStream Console, or using one of infrastructure-as-code deployment options. The WarpStream Agents that join this cluster will only perform Tableflow operations and do not expose the Apache Kafka protocol. Please refer to our other documentation for how to install and configure the WarpStream Agents in your environment as the process does not differ for Tableflow.
Once the Agents are running, you can open the Configuration table and start defining your source clusters, topics, tables, and schemas. During the Early Access period, the only method for defining schemas is inline
mode, which doesn't require using an external Schema Registry. Schema Registry support will be available in a future release.
Managed Tables
Tableflow tables are fully managed by WarpStream, or what we call "managed tables". You cannot use another system for performing writes, compactions, or other table maintenance operations. This is in contrast to a connector-based approach where you would be forced to combine multiple distinct systems or operations together to implement all of these functions.
Specify Source Clusters
Source clusters are the Apache Kafka-compatible systems like WarpStream that store the topics you'd like to convert to tables. You define clusters by giving them a name, a bootstrap URL, and credentials if they are needed. You define source clusters at the root of the configuration YAML. Cluster credentials and connection information are configured the same way as for Orbit.
source_clusters:
- name: "benchmark"
credentials:
sasl_username_env: "YOUR_SASL_USERNAME"
sasl_password_env: "YOUR_SASL_PASSWORD"
bootstrap_brokers:
- hostname: "your-kafka-brokers.example.com"
port: 9092
You can define multiple source clusters so a single Tableflow cluster can centralize data from multiple clusters into one unified place.
Specify the Destination Bucket URL
You must define the bucket URL where your table data will be stored. You must ensure your Agent deployment has the appropriate IAM permissions to perform read, write, list, and delete operations against this bucket. You define the destination_bucket_url
at the root of the configuration YAML.
Tables will be created in the warpstream/_tableflow/
prefix within the bucket.
Example:
destination_bucket_url: "s3://bucket-name-here?region=us-east-1"
Specify Topics and Tables
The next step is defining your tables and topics. Each table you define has exactly one source topic, and the table will be named the same as the topic name. During the Early Access period, only the inline
schema mode is available, which requires defining the schema inline inside the configuration YAML.
Your topics data can be formatted as either JSON or Avro. In the case of Avro, we will remove the Schema Registry magic byte prefix and not rely on the schema ID when running in inline
mode.
tables:
- source_cluster_name: "benchmark"
source_topic: "example_json_logs_topic"
source_format: "json"
schema_mode: "inline"
schema:
fields:
- { name: environment, type: string, id: 1}
- { name: service, type: string, id: 2}
- { name: status, type: string, id: 3}
- { name: message, type: string, id: 4}
- source_cluster_name: "benchmark"
source_topic: "example_avro_events_topic"
source_format: "avro"
schema_mode: "inline"
schema:
fields:
- { name: event_id, id: 1, type: string }
- { name: user_id, id: 2, type: long }
- { name: session_id, id: 3, type: string }
- name: profile
id: 4
type: struct
fields:
- { name: country, id: 5, type: string }
- { name: language, id: 6, type: string }
- name: device
id: 7
type: struct
fields:
- { name: type, id: 8, type: string }
- name: os
id: 9
type: struct
fields:
- { name: name, id: 10, type: string }
- { name: version, id: 11, type: string }
- { name: vendor, id: 25, type: string, optional: true }
- name: browser
id: 12
type: struct
fields:
- { name: name, id: 13, type: string }
- { name: version, id: 14, type: string }
- name: screen
id: 15
type: struct
fields:
- { name: width, id: 16, type: int }
- { name: height, id: 17, type: int }
- { name: pixel_ratio, id: 18, type: double }
- { name: model, id: 19, type: string, optional: true }
- name: cookies
id: 20
type: list
fields:
- { name: element, id: 21, type: string }
- name: event_attributes
id: 22
type: map
fields:
- { name: key, id: 23, type: string }
- { name: value, id: 24, type: string }
Schema Definitions
As shown in the above example, schemas specified with the inline
mode mirror the Iceberg schema definition encoded as YAML. The id
field will be mapped to the field ID for your Iceberg table, and field IDs must be unique.
Partitioning
Tableflow supports unpartitioned and hour partitioned tables during the Early Access period. Tableflow will support arbitrary custom partitioning in an upcoming release.
The partitioning scheme is specified on a per-table basis. The default is an unpartitioned table. Migrating between partitioning schemes works the same way as migrating schemas: just update the YAML with the desired new scheme, and it will be reflected in a future sync of the metadata into your bucket.
tables:
- source_topic: "example_json_logs_topic"
...
partitioning_scheme: "hour"
schema:
...
Schema Migrations
Schema migrations in Tableflow are supported for adding columns, changing fields from required to optional, and widening integer and floating point types. To perform any of these operations, update the schema to reflect the change in the Configuration editor and deploy it. In the next few syncs of the table metadata into your bucket, the schema change will be reflected.
Ensure that you execute a schema change before attempting to send data with the new schema. If you fail to do this, you will potentially lose the data written with the newer schema.
Dropping and re-ordering columns is not supported during the EA period. Destructive schema migrations such as changing types in a non-widening way are not supported.
Tableflow UI
The Tableflow UI available in the WarpStream console allows editing the Configuration.

Programmatic Access: HTTP API and Terraform
The HTTP API and Terraform providers do not yet support Tableflow. Support will be added for these at some point during the Early Access period.
Data Retention
During the Early Access period, data is retained in the tables indefinitely. Specifying a retention period will be allowed in an upcoming release.
Table Deletion
Tableflow does not delete any tables from the object storage bucket when they are removed from the Configuration in order to prevent accidental data deletion. To delete a table, first delete it from the Configuration and then use your cloud provider's UI or CLI to delete the directory containing your table from within the warpstream/_tableflow
directory.
Connect Catalogs and Query Engines
Tableflow does not have any direct integration with external catalogs. Most catalogs have some concept of an "external table" where the catalog will periodically scrape the object storage bucket where the table lives as the mode of integration, or fetch the latest metadata on-demand when running a query. For example, Snowflake calls this the "Iceberg files" integration method.
Similar to catalogs, most query engines have a way to specify the location of a table in object storage and query that directly without going through a catalog. This method can be applied to ClickHouse and DuckDB, for example.
DuckDB
To query Tableflow tables with DuckDB, you'll first need to install and load the Iceberg extension.
INSTALL iceberg;
LOAD iceberg;
Once you've installed the Iceberg extension, you'll need to connect to the object store system where you've chosen to store your Tableflow tables.
Once you've successfully connected to the object store, you can query your tables using the standard syntax for the Iceberg extension:
SELECT * from iceberg_scan('s3://example-bucket/_tableflow/tablename+tableUUID');
ClickHouse
To query Tableflow tables with ClickHouse, you'll first need to configure credentials for your object store where you've chosen to store your tables and use the Iceberg table engine.
AWS S3
For testing purposes, you can define credentials inline in the CREATE TABLE
statement. For production, we recommend using the configuration file to define a "named collection" for your credentials.
<clickhouse>
<named_collections>
<iceberg_conf>
<url>http://test.s3.amazonaws.com/clickhouse-bucket/</url>
<access_key_id>test</access_key_id>
<secret_access_key>test</secret_access_key>
</iceberg_conf>
</named_collections>
</clickhouse>
Then you can run your CREATE TABLE
statement against the named collection instead of inline credentials:
CREATE TABLE iceberg_table ENGINE=IcebergS3(iceberg_conf, filename = '_tableflow/tablename+tableUUID')
GCS
For connecting to GCS, you'll need to create GCS HMAC credentials and use them in place of the access_key_id
and secret_access_key
for GCS.
Partition Pruning
To take advantage of partition pruning in ClickHouse, you must set a session variable.
SET use_iceberg_partition_pruning = 1;
AWS Glue
To work with Tableflow tables in Glue, we first need to use the AWS Glue Crawler to register the table, and then we can use Glue ETL Jobs or Athena to query the table.
Setup AWS Glue Crawlers
In AWS Glue, go to Crawlers and click Create Crawler.
Name the crawler, then add a data source, select "Iceberg" from drop down and provide the S3 folder path.
Configure the required IAM role, select a database, set an optional table prefix and schedule.
Review the settings, create the crawler, and then run it.
Once crawler run is successful table is registered by Glue Crawler you can query it with Glue ETL Jobs as explained here.
Amazon Athena
To query Tableflow tables using Athena, we can either use table created by Glue Crawler as explained in above section, or we can us CREATE TABLE
statement in Athena.
We can use "CREATE TABLE" directly in Athena, this will create a new Iceberg table and register itself with AWS Glue Data Catalog.
CREATE TABLE [db_name.]table_name
(col_name data_type [COMMENT col_comment] [, ...] ) // schema of table here
LOCATION 's3://demo-bucket/_tableflow/path/to/table'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'parquet'
);
Observability
Ingestion lag for tables is emitted as a metric from the WarpStream Tableflow Agents. This metric is also available visually within the WarpStream Console. This metric is called warpstream_tableflow_partition_time_lag_seconds
. Offset lag will be available at a later date during the EA period.
This metric is tagged by table.id
, table.name
, topic
, and partition
. partition
-level tagging can be disabled with the WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS=partition
environment variable.

Last updated
Was this helpful?