Tableflow Setup
This page describes how to setup WarpStream Tableflow.
Introduction
Tableflow automates the tedious process of transforming a topic in an Apache Kafka-compatible data streaming system into an Apache Iceberg table. Instead of writing custom code and manually configuring a data pipeline for each table you want to build, Tableflow allows you to declaratively specify which topics to build tables from and what schema and data format to expect. When schemas inevitably need to change, you can update the schema in Tableflow's editor and WarpStream will handle the schema migration automatically.
Compaction and table maintenance is included out-of-the-box with no tuning required. Tableflow continuously compacts the table in the background with intelligent heuristics to ensure readers get the best performance.
Tableflow is available as Bring-Your-Own-Cloud (BYOC) where the compute and storage live inside your cloud account inside your VPC. The raw data for your table is only ever stored inside your object storage bucket and never leaves your VPC during the table ingestion and maintenance process. Tableflow maintains a metadata store inside WarpStream Cloud as the Iceberg metadata layer that is periodically synced into your object storage bucket.
Getting Started
To get started with Tableflow, you first need to create a Tableflow cluster from the WarpStream Console, or using one of infrastructure-as-code deployment options. The WarpStream Agents that join this cluster will only perform Tableflow operations and do not expose the Apache Kafka protocol. Please refer to our other documentation for how to install and configure the WarpStream Agents in your environment as the process does not differ for Tableflow.
As part of deploying the Agents, you'll also need to setup and configure an object storage bucket and/or provide the Agents with access to one of your existing buckets. See our object storage configuration documentation for more details on that.
Once the Agents are running, you can open the Configuration table and start defining your source clusters, topics, tables, and schemas. During the Early Access period, the only method for defining schemas is inline mode, which doesn't require using an external Schema Registry. Schema Registry support will be available in a future release.
Managed Tables
Tableflow tables are fully managed by WarpStream, or what we call "managed tables". You cannot use another system for performing writes, compactions, or other table maintenance operations. This is in contrast to a connector-based approach where you would be forced to combine multiple distinct systems or operations together to implement all of these functions.
Configuration
Tableflow is configured with is fully controllable from a single YAML file which can be edited through the WarpStream console or the Pipelines API.
Overview
source_clusters:
- name: tableflow_cluster_1
bootstrap_brokers:
- hostname: localhost
port: 9092
- name: tableflow_cluster_2
bootstrap_brokers:
- hostname: broker1.kafkaserver.com
port: 9092
- hostname: broker2.kafkaserver.com
port: 9092
credentials:
sasl_username_env: SASL_USERNAME_ENV_VAR
sasl_password_env: SASL_PASSWORD_ENV_VAR
use_tls: true
sasl_mechanism: plain
tables:
- source_cluster_name: tableflow_cluster_1
source_topic: example_json_logs_topic
source_format: json
schema_mode: inline
schema:
fields:
- { name: environment, type: string, id: 1}
- { name: service, type: string, id: 2}
- { name: status, type: string, id: 3}
- { name: message, type: string, id: 4}
- source_cluster_name: tableflow_cluster_2
source_topic: example_avro_events_topic
source_format: avro
schema_mode: inline
schema:
fields:
- { name: event_id, id: 1, type: string }
- { name: user_id, id: 2, type: long }
- { name: session_id, id: 3, type: string }
destination_bucket_url: s3://my-bucket-nameThe YAML specifies
The source clusters Tableflow should connect to.
For each cluster, the topic that Tableflow should create Iceberg tables from.
For each topic, the schema to deserialize the Kafka records with. The same schema is used to create the Iceberg table.
The destination bucket to store the table.
Configure Source Clusters
Source clusters are the Apache Kafka-compatible systems like WarpStream that store the topics you'd like to convert to tables. You define clusters by giving them a name, a list of brokers, and credentials if they are needed. You define source clusters at the root of the configuration YAML.
source_clusters:
- name: tableflow_cluster
bootstrap_brokers:
- hostname: broker.kafkaserver.com
port: 9092You can define multiple source clusters so a single Tableflow cluster can centralize data from multiple clusters into one unified place.
Configure Connection and Credentials
If credentials are needed to connect to the Kafka cluster, the connection information can be provided under the credentials block for each cluster.
TLS
source_clusters:
- name: tableflow_cluster
...
credentials:
use_tls: true
tls_insecure_skip_verify: falseuse_tlsspecifies whether the Agents should use TLS when connecting to your source clusters.tls_insecure_skip_verifyspecifies whether a client verifies the server's certificate chain and host name.
SASL
source_clusters:
- name: tableflow_cluster
...
credentials:
sasl_username_env: SASL_USERNAME_ENV_VAR
sasl_password_env: SASL_PASSWORD_ENV_VAR
sasl_mechanism: plain
use_tls: trueBoth the
sasl_username_envand thesasl_password_envfields refer to environment variable names. The Agents will append aTABLEFLOW_prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured asTABLEFLOW_SASL_USERNAME_ENV_VARandTABLEFLOW_SASL_PASSWORD_ENV_VARrespectively.The default value of
sasl_mechanismisplain. Supported mechanisms include:plain,scram-256, andscram-512.
mTLS PEM encoded certs
source_clusters:
- name: tableflow_cluster
...
credentials:
mtls_client_cert_env: MTLS_CERT_PATH_ENV_VAR
mtls_client_key_env: MTLS_KEY_PATH_ENV_VAR
mtls_server_ca_cert_env: MTLS_SERVER_CA_CERT_PATH_ENV_VAR
use_tls: trueThe
mtls_client_cert_env,mtls_client_key_env, andmtls_server_ca_cert_envfields refer to environment variable names. The Agents will append aTABLEFLOW_prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured asMTLS_CERT_PATH_ENV_VAR,MTLS_KEY_PATH_ENV_VAR, andMTLS_SERVER_CA_CERT_PATH_ENV_VARrespectively.mtls_client_cert_envspecifies the environment variable that contains the path to the X.509 certificate file in PEM format.mtls_client_key_envspecifies the environment variable that contains the path to the X.509 private key file in PEM format.mtls_server_ca_cert_envis optional and specifies the environment variable that contains the path to the X.509 certificate file in PEM format for the client certificate authority. "If not specified, the host's root certificate pool will be used for client certificate verification.
Configure the Destination Bucket URL
To specify where your table data should be stored, use the destination_bucket_url field at the root of the configuration YAML.
destination_bucket_url: s3://bucket-name?region=us-east-1Check our object storage configuration documentation for more details on how to configure this URL for various different cloud providers, as well as for a complete list of permissions that the Agents will require.
Tables will be created under the <bucket-name>/warpstream/_tableflow path. Optionally, a prefix can be specified in the bucket URL, which will result in Tables being created under the <bucket-name>/prefix/warpstream/_tableflow path.
Configure Topics and Tables
The next step is defining your tables and topics. Each table you define has exactly one source topic, and the table will be named the same as <cluster-name>+<topic+name>. During the Early Access period, only the inline schema mode is available, which requires defining the schema inline inside the configuration YAML.
Your topics data can be formatted as either JSON or Avro. In the case of Avro, we will remove the Schema Registry magic byte prefix and not rely on the schema ID when running in inline mode. Additionally, for Avro the provided schema needs to match the schema the records were written with exactly for decoding to work.
tables:
- source_cluster_name: tableflow_cluster_1
source_topic: example_json_logs_topic
source_format: json
schema_mode: inline
schema:
fields:
- { name: environment, type: string, id: 1}
- { name: service, type: string, id: 2}
- { name: status, type: string, id: 3}
- { name: message, type: string, id: 4}
- source_cluster_name: tableflow_cluster_2
source_topic: example_avro_events_topic
source_format: avro
schema_mode: inline
schema:
fields:
- { name: event_id, id: 1, type: string }
- { name: user_id, id: 2, type: long }
- { name: session_id, id: 3, type: string }
- name: profile
id: 4
type: struct
fields:
- { name: country, id: 5, type: string }
- { name: language, id: 6, type: string }
- name: device
id: 7
type: struct
fields:
- { name: type, id: 8, type: string }
- name: os
id: 9
type: struct
fields:
- { name: name, id: 10, type: string }
- { name: version, id: 11, type: string }
- { name: vendor, id: 25, type: string, optional: true }
- name: browser
id: 12
type: struct
fields:
- { name: name, id: 13, type: string }
- { name: version, id: 14, type: string }
- name: screen
id: 15
type: struct
fields:
- { name: width, id: 16, type: int }
- { name: height, id: 17, type: int }
- { name: pixel_ratio, id: 18, type: double }
- { name: model, id: 19, type: string, optional: true }
- name: cookies
id: 20
type: list
fields:
- { name: element, id: 21, type: string }
- name: event_attributes
id: 22
type: map
fields:
- { name: key, id: 23, type: string }
- { name: value, id: 24, type: string }Schema Definitions
As shown in the above example, schemas specified with the inline mode contain a list of fields. Each field is named and has a unique integer id that will be mapped to the field ID for your Iceberg table as well as a type that will be used as the Iceberg date type for the corresponding column.
Primitive Types
The syntax for defining a primitive field looks like the following:
- { name: <field-name>, id: <field-id>, type: <field-type>, optional: { true | false} }where field-type is one of boolean, int, long, float, double, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, and binary.
Nested Types
A struct is specified as a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. Fields can be of any type.
- name: <struct-field-name>
id: <struct-field-id>
type: struct
fields:
- { name: <struct-field-1>, id: <struct-field-id-1>, type: <struct-field-type-1>, optional: { true | false} }
- { name: <struct-field-2>, id: <struct-field-id-2>, type: <struct-field-type-2>, optional: { true | false} }
- { name: <struct-field-3>, id: <struct-field-id-3>, type: <struct-field-type-3>, optional: { true | false} }A map is specified as a key/value pair. Both the key field and value field have an integer id that is unique in the table schema. While map values can be either optional or required, map keys are required. Both map keys and values can be of any type.
- name: <map-field-name>
id: <map-field-id>
type: map
fields:
- { name: <key-name>, id: <key-field-id>, type: <key-field-type> }
- { name: <value-name>, id: <value-field-id, type: <value-field-id>, optional: { true | false} }A list is specified with a single element field. The element field is named element and has an integer id that is unique in the table schema. Elements can be either optional* or required and can be of any type.
- name: <list-field-name>
id: <list-field-id>
type: list
fields:
- { name: element, id: <element-field-id>, type: <element-field-type> }*Support for optional list elements will be coming soon.
Schema Migrations
Schema migrations in Tableflow are supported for adding columns, changing fields from required to optional, and widening integer and floating point types. To perform any of these operations, update the schema to reflect the change in the Configuration editor and deploy it. In the next few syncs of the table metadata into your bucket, the schema change will be reflected.
Ensure that you execute a schema change before attempting to send data with the new schema. If you fail to do this, you will potentially lose the data written with the newer schema.
Dropping and re-ordering columns is not supported during the EA period. Destructive schema migrations such as changing types in a non-widening way are not supported.
Partitioning
Tableflow supports unpartitioned and hour partitioned tables using the record timestamp during the Early Access period. In the future, arbitrary custom partitioning will be supported. Additionally, migrating between partitioning schemes is currently not supported.
The partitioning scheme is specified on a per-table basis. The default is an unpartitioned table. Supported values for partitioning_scheme include unpartitioned and hour.
tables:
- source_topic: example_json_logs_topic
...
partitioning_scheme: hour
schema:
...Transforms (Agent v730+)
Tableflow supports applying stateless transformations to ingested records. This can be helpful to massage the data into the desired shape before inserting it into the table without having to reprocess the data into a completely different topic first.
For example, imagine the topic contains a CDC change stream from debezium that looks like the following:
{
"schema": {...},
"payload": {
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
}Without a transform, the table schema would have to be defined as follows:
source_format: json
schema:
fields:
- name: payload
id: 1
type: struct
fields:
- name: after
id: 2
type: struct
fields:
- { name: field1, id: 3, type: string }
- { name: field2, id: 4, type: string }This is unfortunate because users querying the data would always have to write their queries in the form: SELECT payload.after.field1 instead of simply SELECT field1.
Transforms solve this problem by rewriting the structure of the record before applying the table schema. For example, we can rewrite the Tableflow configuration above as follows:
source_format: json
transforms:
- transform_type: bento
transform: |
root.field1 = this.payload.after.field1
root.field2 = this.payload.after.field2
schema:
fields:
- name: field1
id: 1
type: string
- name: field2
id: 2
type: stringNote that while the example above works for a JSON workload, it would not work for an Avro or Protobuf workload. The reason for this is that JSON is self-describing, while Avro and Protobuf records can only be deserialized if the schema is known ahead of time.
However, when transformations are used, records may only match the schema defined in the schema field after the transformation is applied. As a result, Avro and Protobuf workloads may require a separate input_schema which defines how to deserialize the input records pre-transformation.
For example, if the previous use-case were Avro encoded our Tableflow configuration would look like this:
source_format: avro
input_schema:
fields:
- name: payload
type: struct
fields:
- name: after
type: struct
fields:
- { name: field1, type: string }
- { name: field2, type: string }
transforms:
- transform_type: bento
transform: |
root.field1 = this.payload.after.field1
root.field2 = this.payload.after.field2
schema:
fields:
- name: field1
id: 1
type: string
- name: field2
id: 2
type: stringNote that field IDs are not required for input_schema when the source format is avro, but are required when the source format is protobuf. Also, the optional property is not enforced by input_schema, all fields are treated as optional, so whether or not a field is optional or required is still ultimately determined by the value of schema even when input_schema and transforms are being used.
In summary, pre-transformation processed records must match input_schema and post-transformation must match schema . If input_schema is not defined, then it defaults to the same value as schema. input_schema is never required when processing JSON payloads, but may be required when processing Avro and Protobuf payloads.
Separately, keep in mind that transforms can be chained:
transforms:
- transform_type: bento
transform: |
root.field1 = this.payload.after.field1
- transform_type: bento
transform: |
root.field2 = this.payload.after.field2Tableflow transformations are executed by running arbitrary Bento Bloblang programs, but are limited to "pure" Bloblang functions that have no external side-effects.
Bloblang is a rich turing-complete programming language with many features, functions, methods, conditionals, and even error-handling. You can read more about Bloblang and its capabilities in the Bento Bloblang documentation, but but the basics are quite straightforward and can be grasped with a few examples.
The key thing to understand about Bloblang transformations is that they're mapping functions that mutate the input record into the desired shape. Within the context of a Tableflow Bloblang mapping, the this keyword refers to the input record and the root keyword refers to the output record. See the examples below to learn how to perform the most common transformations.
The Bento website has a powerful and interactive Bloblang playground that can be used to experiment with Bloblang mapping programs.
Rename a field
root.new_field = this.old_fieldDelete a field
root.unwanted_field = deleted()Add a field
root.uppercase_name = this.name.uppercase()Drop / filter out an entire record
if this.name == "foo" {
root = deleted()
} else {
root.name = this.name.uppercase()
}Type Conversions
this.user_age = this.user_age.string()Data Retention and TTL
By default, data is retained in the table indefinitely. Optionally, a retention period can specified using the retention_ttl field. Retention must be expressed in units of hour (h).
tables:
- source_topic: example_json_logs_topic
retention_ttl: 720h ## 30d
...Table Deletion
Tableflow does not delete any tables from the object storage bucket when they are removed from the Configuration in order to prevent accidental data deletion. To delete a table, first delete it from the Configuration and then use your cloud provider's UI or CLI to delete the directory containing your table from within the warpstream/_tableflow directory.
Terraform / Infrastructure as Code
Tableflow Agents are deployed using the standard WarpStream Agent chart, and there is full support for Tableflow clusters in the WarpStream Terraform provider.
Click here for a complete example of creating a Tableflow cluster and configuring it to ingest a single topic into an Iceberg table.
Observability
Ingestion lag for tables is emitted as a metric from the WarpStream Tableflow Agents. This metric is also available visually within the WarpStream Console. This metric is called warpstream_tableflow_partition_time_lag_seconds. Offset lag will be available at a later date during the EA period.
This metric is tagged by table.id, table.name, topic, and partition. partition-level tagging can be disabled with the WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS=partition environment variable.

Tableflow UI
The Tableflow UI available in the WarpStream console allows editing the Configuration.

Last updated
Was this helpful?