Schema Registry (beta)
This page describes how to connect an agent with an external schema registry and configure a topic to enable schema validation.
Last updated
This page describes how to connect an agent with an external schema registry and configure a topic to enable schema validation.
Last updated
Historically, schemas stored in schema registries are used only by clients to serialize/deserialize and validate messages. With WarpStream, you can configure the agents to not only validate that the record contains a valid schema ID, but that the record actually conforms to the corresponding schema. The agent can then reject or emit metrics when it receives invalid records.
Note that enabling schema validation will increase the CPU usage of the agent.
Currently, WarpStream supports two types of schema registries:
Kafka-compatible Schema Registry
AWS Glue Schema Registry
Here is a brief overview of how schema validation works in WarpStream:
The producer serializes data with the schema retrieved from the schema registry
The producer send the data to a WarpStream agent.
On receiving the message, the WarpStream agent decodes the message to obtain some form of schema identifier that points to a remote schema.
The agent uses the schema identifier to fetch the remote schema.
Finally, the agent verifies if the data actually conforms to the schema and rejects (or emit metrics) any invalid records.
This process is illustrated in the diagram below:
Currently, WarpStream supports the following schema formats: Avro
and JSON Schema
(with Protobuf
coming soon).
Schema validation is configurable per topic. The following configurations can be provided when a topic is created or altered. Note that there are additional configurations depending on the schema registry type, which will be discussed in the next section.
Configuration | Description |
---|---|
| Boolean config that indicates whether to validate the record key. |
| Boolean config that indicates whether to validate the record value. |
| When an invalid record is detected, the Agent allows the record to be written, but emits a metric indicating that the record is invalid instead of rejecting the record.
The metric (counter) emitted is: Defaults to true. |
| The type of schema registry that the schemas live in. Supported values include:
Defaults to |
Below are topic-level configurations for Kafka-compatible schema registries.
Configuration | Description |
---|---|
| Config that determines which schemas are allowed for the record key. Allowed values: |
| Config that determines which schemas are allowed for the record key. Allowed values: |
Each schema in the Schema Registry is registered under a subject. During schema validation, the agent looks up the subject for the schema ID and verifies that the subject conforms to the subject name strategy.
There are three subject name strategies:
Strategy | Definition |
---|---|
TopicNameStrategy | The subject is derived from the topic name with the following format:
|
RecordNameStrategy | The subject is the schema’s fully-qualified record name. |
TopicRecordNameStrategy | The subject is a combination of the topic name and the record name with the following format: <topic name>-<fully-qualified record name> |
The fully-qualified record name for Avro is the record’s namespace + record name. For JSON Schema, the record name is the title
.
To allow the agent to connect to a Kafka-specific schema registry, set the -schemaRegistryURL
flag to the URL of the schema registry. Alternatively, you can also set the WARPSTREAM_SCHEMA_REGISTRY_URL
environment variable.
Most schema registry implementations support some form of authentication. Warpstream supports connecting to external schema registries with MTLS, TLS, or basic authentication.
For basic authentication, supply the username
and password
as follows:
set the -externalSchemaRegistryBasicAuthUsername
flag to the username of the schema registry. Alternatively, set the WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_BASIC_AUTH_USERNAME
environment variable
set the -externalSchemaRegistryBasicAuthPassword
flag to the password of the schema registry. Alternatively, set the WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_BASIC_AUTH_PASSWORD
environment variable
For mTLS
, the agent needs both a certificate and a private key to enable the schema registry server to authenticate the agent.
You can use the -externalSchemaRegistryTlsClientCertFile
and -externalSchemaRegistryTlsClientPrivateKeyFile
to pass in the file paths to the agent certificate and private key, respectively. Alternatively, you can use WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_TLS_CLIENT_CERT_FILE
and WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_TLS_CLIENT_PRIVATE_KEY_FILE
environment variables.
For TLS and mTLS, you can optionally add a file path to the root certificate authority certificate file which the Agent will use to verify the schema registry server's certificate. Use the -externalSchemaRegistryTlsServerCACertFile
flag, or the WARPSTREAM_EXTERNAL_SCHEMA_REGISTRY_TLS_SERVER_CA_CERT_FILE
environment variable.
The agent must be deployed in AWS to connect to an AWS Glue schema registry. In addition, you'll also need to make sure the Agent containers have the appropriate permissions to read from the schema registry.
Below is an example Terraform configuration for an AWS IAM policy document that provides WarpStream with the appropriate permissions to access an AWS Glue schema registry.
Note that there is currently a terraform bug that prevents providing the specific registry arn for the iam policy for AWS Glue. You would have to use "*"
instead as showed in the example.