Tableflow Setup

This page describes how to setup WarpStream Tableflow.

Introduction

Tableflow automates the tedious process of transforming a topic in an Apache Kafka-compatible data streaming system into an Apache Iceberg table. Instead of writing custom code and manually configuring a data pipeline for each table you want to build, Tableflow allows you to declaratively specify which topics to build tables from and what schema and data format to expect. When schemas inevitably need to change, you can update the schema in Tableflow's editor and WarpStream will handle the schema migration automatically.

Compaction and table maintenance is included out-of-the-box with no tuning required. Tableflow continuously compacts the table in the background with intelligent heuristics to ensure readers get the best performance.

Tableflow is available as Bring-Your-Own-Cloud (BYOC) where the compute and storage live inside your cloud account inside your VPC. The raw data for your table is only ever stored inside your object storage bucket and never leaves your VPC during the table ingestion and maintenance process. Tableflow maintains a metadata store inside WarpStream Cloud as the Iceberg metadata layer that is periodically synced into your object storage bucket.

Getting Started

To get started with Tableflow, you first need to create a Tableflow cluster from the WarpStream Console, or using one of infrastructure-as-code deployment options. The WarpStream Agents that join this cluster will only perform Tableflow operations and do not expose the Apache Kafka protocol. Please refer to our other documentation for how to install and configure the WarpStream Agents in your environment as the process does not differ for Tableflow.

As part of deploying the Agents, you'll also need to setup and configure an object storage bucket and/or provide the Agents with access to one of your existing buckets. See our object storage configuration documentation for more details on that.

Once the Agents are running, you can open the Configuration table and start defining your source clusters, topics, tables, and schemas. Currently the only method for defining schemas is inline mode, which doesn't require using an external Schema Registry. Schema Registry support will be available in a future release.

Managed Tables

Tableflow tables are fully managed by WarpStream, or what we call "managed tables". You cannot use another system for performing writes, compactions, or other table maintenance operations. This is in contrast to a connector-based approach where you would be forced to combine multiple distinct systems or operations together to implement all of these functions.

Configuration

Tableflow is configured with is fully controllable from a single YAML file which can be edited through the WarpStream console or the Pipelines API.

Overview

The YAML specifies

  • The source clusters Tableflow should connect to.

  • For each cluster, the topic that Tableflow should create Iceberg tables from.

  • For each topic, the schema to deserialize the Kafka records with. The same schema is used to create the Iceberg table.

  • The destination bucket to store the table.

Configure Source Clusters

Source clusters are the Apache Kafka-compatible systems like WarpStream that store the topics you'd like to convert to tables. You define clusters by giving them a name, a list of brokers, and credentials if they are needed. You define source clusters at the root of the configuration YAML.

You can define multiple source clusters so a single Tableflow cluster can centralize data from multiple clusters into one unified place.

Configure Connection and Credentials

If credentials are needed to connect to the Kafka cluster, the connection information can be provided under the credentials block for each cluster.

circle-info

Note that if the source Kafka cluster is a WarpStream cluster, credentials still need to be provided if authentication is required. This is different from the Managed Data Pipelines setup where credentials are injected automatically.

TLS

  • use_tls specifies whether the Agents should use TLS when connecting to your source clusters.

  • tls_insecure_skip_verify specifies whether a client verifies the server's certificate chain and host name.

SASL

  • Both the sasl_username_env and the sasl_password_env fields refer to environment variable names. The Agents will append a TABLEFLOW_ prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as TABLEFLOW_SASL_USERNAME_ENV_VAR and TABLEFLOW_SASL_PASSWORD_ENV_VAR respectively.

  • The default value of sasl_mechanism is plain. Supported mechanisms include: plain, scram-256, and scram-512.

mTLS PEM encoded certs

  • The mtls_client_cert_env, mtls_client_key_env, and mtls_server_ca_cert_env fields refer to environment variable names. The Agents will append a TABLEFLOW_ prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as MTLS_CERT_PATH_ENV_VAR, MTLS_KEY_PATH_ENV_VAR, and MTLS_SERVER_CA_CERT_PATH_ENV_VAR respectively.

  • mtls_client_cert_env specifies the environment variable that contains the path to the X.509 certificate file in PEM format.

  • mtls_client_key_env specifies the environment variable that contains the path to the X.509 private key file in PEM format.

  • mtls_server_ca_cert_env is optional and specifies the environment variable that contains the path to the X.509 certificate file in PEM format for the client certificate authority. "If not specified, the host's root certificate pool will be used for client certificate verification.

Configure the Destination Bucket URL

To specify where your table data should be stored, use the destination_bucket_url field at the root of the configuration YAML. This configures the default destination bucket URL for all tables.

Alternatively, you can specify per-table bucket URL overrides within each tables configuration:

circle-exclamation

Check our object storage configuration documentation for more details on how to configure this URL for various different cloud providers, as well as for a complete list of permissions that the Agents will require.

Note that Tables will be created under the <bucket-name>/warpstream/_tableflow path. Optionally, a prefix can be specified in the bucket URL, which will result in Tables being created under the <bucket-name>/prefix/warpstream/_tableflow path.

Configure Topics and Tables

The next step is defining your tables and topics. Each table you define has exactly one source topic, and the table will be named the same as <cluster-name>+<topic+name>.

Your topics data can be formatted as either JSON, Avro, or Protobuf. In the case of Avro, we will remove the Schema Registry magic byte prefix and not rely on the schema ID when running in inline mode. Additionally, for Avro the provided schema needs to match the schema the records were written with exactly for decoding to work.

circle-info

Tableflow currently supports append-only tables. If you ingest data from a compacted topic in the source cluster, rows will not be deduplicated and any tombstones may not comply with the schema. Support for compacted topics is coming soon.

Protobuf-specific configurations (Agent v745+)

Protobuf schemas require additional configuration as compared to JSON and Avro. Namely, one needs to define the wire_format which can be:

  • either raw if encoded records comply with the raw protobuf binary format (i.e., with no prefix).

  • or confluent if encoded records comply with the Confluent Schema Registry format (i.e., with a magic byte and a schema ID prefix).

In addition, field numbers are required to decode Protobuf messages. So each field in your schema must include a proto_field_number that matches the field number in your .proto definition.

For example, given this .proto definition:

The correct inline schema would be:

circle-exclamation

The schema needs to contain both the id and the proto_field_number:

  • the id corresponds to the column ID in Iceberg

  • the proto_field_number is used for decoding the records.

If you use the transformsarrow-up-right and therefore provide both an input_schema (to decode the records) and a schema (to define the Iceberg table), then you need:

  • the id in the schema only

  • the proto_field_number in the input_schema only.

So the equivalent to the schema above in the case of a transform would be:

A typical full example for protobuf is the following:

Schema Definitions

As shown in the above example, schemas specified with the inline mode contain a list of fields. Each field is named and has a unique integer id that will be mapped to the field ID for your Iceberg table as well as a type that will be used as the Iceberg date type for the corresponding column.

Primitive Types

The syntax for defining a primitive field looks like the following:

where field-type is one of:

  • boolean, int, long, float, double, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, and binary for Avro and JSON;

  • boolean, int32, sint32, uint32, fixed32, sfixed32, int64, sint64, uint64, fixed64, sfixed64, float, double, string, and bytes for Protobuf (agent v745+).

The Protobuf types are converted as follows to the Iceberg types:

Protobuf Type
Iceberg Type
Notes

boolean

boolean

int32

integer

sint32

integer

sfixed32

integer

uint32

long

Stored as 64-bit to prevent overflow (max uint32 > max int32)

fixed32

long

Stored as 64-bit to prevent overflow (max uint32 > max int32)

int64

long

sint64

long

sfixed64

long

uint64

string

Stored as base-10 string to prevent overflow (max uint64 > max int64)

fixed64

string

Stored as base-10 string to prevent overflow (max uint64 > max int64)

float

float

double

double

enum

string

Stored as the enum value name

string

string

bytes

binary

message

struct

map

map

repeated

list

oneof

struct

Converted to a struct where each option is an optional field

Note: Iceberg does not have unsigned integer types. To prevent overflow when storing large unsigned values:

  • uint32 and fixed32 are widened to long (64-bit signed)

  • uint64 and fixed64 are stored as string (base-10 representation)

Nested Types

For Avro and JSON only

A struct is specified as a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. Fields can be of any type.

For Avro, JSON and Protobuf

A map is specified as a key/value pair. Both the key field and value field have an integer id that is unique in the table schema. While map values can be either optional or required, map keys are required. For both Avro and JSON schemas, map keys can only be of type string, but values can be of any type. For Protobuf, map keys can be of any map key type allowed by the Protobuf specs, namely: string, int32, int64, uint32, uint64, sint32, sint64, sfixed32, sfixed64, fixed32, fixed64, boolean.

Note that for Protobuf, the proto_field_number must be added to the map field but should not be set for the key nor value. So a full example for a Protobuf map is:

A list (in JSON and Avro) or repeated (in Protobuf) is specified with a single element field. The element field is named element and has an integer id that is unique in the table schema. Elements can be either optional* or required and can be of any type.

Note that for Protobuf, the proto_field_number must be added to the repeated field but should not be set for the element. So a full example for a Protobuf repeated is:

*Support for optional list elements will be coming soon.

For Protobuf only

A message is defined using type: message. A proto_field_number must be provided for every nested field.

An enum is stored as a string in the Iceberg table using the enum value name. You must define all enum values with their corresponding numbers.

circle-exclamation

A oneof field is defined with type: oneof. Each option must be explicitly set to optional and with a proto_field_number, but the oneof field itself must be defined as required and without any proto_field_number.

Partitioning

Tableflow supports unpartitioned, timestamp partitioned tables using the record timestamp, and custom partitioning. Tables are unpartitioned by default and migrating between partitioning schemes is currently not supported.

The partitioning scheme is specified on a per-table basis. For convenience the partitioning_scheme option can be used to define unpartitioned tables and timestamp partitioned tables using the record timestamp. Supported values for partitioning_scheme include unpartitioned, hour, day, month, and year.

If a custom partitioning scheme is needed, then custom_partitioning option can be used as follows:

  • source_field_id is the ID of the field to partition on. This can be either a field from the specified schema or one of the default fields.

  • name is the name of the partition. Note that name must start with a letter and contain only alphanumerics or underscores.

  • transform is a struct containing a name field that corresponds to the name of the transformarrow-up-right to be applied as well as a n field (number of buckets) if the bucket transform is used and a w field (width) if the truncate transform is used.

circle-info

Note that to use custom partitioning the Agent version needs to be at least v748.

For example, to create hourly partitions on the Kafka timestamp and bucket the Kafka partitions into fours bins the configuration would look like this:

Transforms (Agent v730+)

Tableflow supports applying stateless transformations to ingested records. This can be helpful to massage the data into the desired shape before inserting it into the table without having to reprocess the data into a completely different topic first.

For example, imagine the topic contains a CDC change stream from debeziumarrow-up-right that looks like the following:

Without a transform, the table schema would have to be defined as follows:

This is unfortunate because users querying the data would always have to write their queries in the form: SELECT payload.after.field1 instead of simply SELECT field1.

Transforms solve this problem by rewriting the structure of the record before applying the table schema. For example, we can rewrite the Tableflow configuration above as follows:

Note that while the example above works for a JSON workload, it would not work for an Avro or Protobuf workload. The reason for this is that JSON is self-describing, while Avro and Protobuf records can only be deserialized if the schema is known ahead of time.

However, when transformations are used, records may only match the schema defined in the schema field after the transformation is applied. As a result, Avro and Protobuf workloads may require a separate input_schema which defines how to deserialize the input records pre-transformation.

For example, if the previous use-case were Avro encoded our Tableflow configuration would look like this:

Note that field IDs are not required for input_schema. Also, the optional property is not enforced by input_schema, all fields are treated as optional, so whether or not a field is optional or required is still ultimately determined by the value of schema even when input_schema and transforms are being used.

In summary, pre-transformation processed records must match input_schema and post-transformation must match schema . If input_schema is not defined, then it defaults to the same value as schema. input_schema is never required when processing JSON payloads, but may be required when processing Avro and Protobuf payloads.

Separately, keep in mind that transforms can be chained:

Tableflow transformations are executed by running arbitrary Bento Bloblang programs, but are limited to "pure" Bloblang functions that have no external side-effects.

Bloblang is a rich turing-complete programming language with many featuresarrow-up-right, functionsarrow-up-right, methodsarrow-up-right, conditionalsarrow-up-right, and even error-handlingarrow-up-right. You can read more about Bloblang and its capabilities in the Bento Bloblang documentationarrow-up-right, but but the basics are quite straightforward and can be grasped with a few examples.

The key thing to understand about Bloblang transformations is that they're mapping functions that mutate the input record into the desired shape. Within the context of a Tableflow Bloblang mapping, the this keyword refers to the input record and the root keyword refers to the output record. See the examples below to learn how to perform the most common transformations.

circle-check

Rename a field

Delete a field

Add a field

Drop / filter out an entire record

Type Conversions

Data Retention and TTL

By default, data is retained in the table indefinitely. Optionally, a retention period can specified using the retention_ttl field. Retention must be expressed in units of hour (h).

Dead Letter Queue (DLQ) Mode

circle-info

Requires Agent v737 or higher.

By default, Tableflow stops ingestion when it sees records that are incompatible with the provided schema to avoid head of line blocking. This behavior can be overridden using the dlq_mode field. Supported values include:

  • stop, which blocks ingestion upon encountering an invalid record (this is the default)

  • skip, which skips invalid records during ingestion

Compression codecs

circle-info

Requires Agent v748 or higher.

Tableflow supports a few compression codecs for the stored data files. The default one is snappy .

This codec can be overridden using the compression field. Supported values include:

  • snappy

  • gzip

  • lz4

  • zstd

  • brotli

  • none To disable compression

Skipping raw record values

circle-info

Requires Agent v749 or higher.

If you don't need to access the raw record values (the ones coming from the kafka topics) you can set the skip_raw_record_values to true in your config. This will result in smaller data files.

Handling topic re-creation

To define the table ingestion behavior when the source topic is recreated, use the topic_recreation_policy setting. Currently, the only supported policy is recreate_table. This ensures data integrity by creating a new table (with a different identifier) whenever the system detects that the source topic has been re-created.

Table Schema

Tableflow creates an Iceberg table with a struct schema, containing all the fields from the configured schema as well as the following default fields:

Name
Field ID
Type

warpstream

10000000

struct

warpstream.partition

10000001

int

warpstream.offset

10000002

long

warpstream.key

10000003

binary

warpstream.value

10000004

binary

warpstream.timestamp

10000005

timestamp

The warpstream.value field can be ommited with the skip_raw_record_values option.

Schema Migrations

Schema migrations in Tableflow are supported for adding columns, changing fields from required to optional, and widening integer and floating point types. To perform any of these operations, update the schema to reflect the change in the Configuration editor and deploy it. In the next few syncs of the table metadata into your bucket, the schema change will be reflected.

circle-exclamation

Dropping and re-ordering columns is not yet supported. Destructive schema migrations such as changing types in a non-widening way are not supported.

Table Deletion

Tableflow does not delete any tables from the object storage bucket when they are removed from the Configuration in order to prevent accidental data deletion. To delete a table, first delete it from the Configuration and then use your cloud provider's UI or CLI to delete the directory containing your table from within the warpstream/_tableflow directory.

Terraform / Infrastructure as Code

Tableflow Agents are deployed using the standard WarpStream Agent chartarrow-up-right, and there is full support for Tableflow clusters in the WarpStream Terraform providerarrow-up-right.

Click herearrow-up-right for a complete example of creating a Tableflow cluster and configuring it to ingest a single topic into an Iceberg table.

Observability

Ingestion lag for tables is emitted as a metric from the WarpStream Tableflow Agents. This metric is also available visually within the WarpStream Console. This metric is called warpstream_tableflow_partition_time_lag_seconds. Offset lag will be available at a later date.

This metric is tagged by table.id, table.name, topic, and partition. partition-level tagging can be disabled with the WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS=partition environment variable.

Tableflow UI

The Tableflow UI available in the WarpStream console allows editing the Configuration.

Last updated

Was this helpful?