Schema Registry catalog
The Schema Registry catalog allows you to access Kafka topics with registered schemas as Flink SQL tables. You can add Schema Registry as a catalog in Flink SQL by adding the dependency to your project, registering it in Java, and enabling it in the custom environment file.
Each Kafka topic will be mapped to a table with TableSchema
that matches the
Avro schema.
Maven
Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cloudera-registry</artifactId>
<version>1.16.1-csa1.10.0.0</version>
</dependency>
The following example shows how to register and use the Schema Registry catalog from
Java:
SchemaRegistryClient client = new SchemaRegistryClient(
ImmutableMap.of(
SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(),
"http://<your_hostname>:7788/api/v1"
)
);
Map<String, String> connectorProps = new Kafka()
.property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"<your_hostname>:9092")
.startFromEarliest()
.toProperties();
tableEnv.registerCatalog(
"registry", new ClouderaRegistryCatalog("registry", client, connectorProps)
);
tableEnv.useCatalog("registry");