Schema Registry catalog

The Schema Registry catalog allows you to access Kafka topics with registered schemas as Flink SQL tables. You can add Schema Registry as a catalog in Flink SQL by adding the dependency to your project, registering it in Java, and enabling it in the custom environment file.

Each Kafka topic will be mapped to a table with TableSchema that matches the Avro schema.

Maven Dependency
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-avro-cloudera-registry</artifactId>
   <version>1.10.0-csa1.2.0.0</version>
</dependency>
The following example shows how to register and use the Schema Registry catalog from Java:
SchemaRegistryClient client = new SchemaRegistryClient(
	ImmutableMap.of(
		SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(),		      
"http://<your_hostname>:7788/api/v1"
			   )
	);

Map<String, String> connectorProps = new Kafka()
		.property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                      "<your_hostname>:9092")
		.startFromEarliest()
		.toProperties();

tableEnv.registerCatalog(
   "registry", new ClouderaRegistryCatalog("registry", client, connectorProps)
 );
tableEnv.useCatalog("registry");
To use the Schema Registry catalog from the SQL client, you have to add it in the YAML configuration file to the catalogs section:
  - name: registry
    type: cloudera-registry
    # Registry Client standard properties
    registry.properties.schema.registry.url: <registry_url>:<port>/api/v1
    registry.properties.key: …
    # Registry Client SSL properties
    registry.client.ssl.keyStorePath: ... 
    registry.client.ssl.sslProp: ... 
    # Kafka Connector properties
    connector.properties.bootstrap.servers: <kafka_brokers>
    connector.startup-mode: earliest-offset
Required catalog configurations:
  • registry.properties.schema.registry.url
  • connector.properties.bootstrap.servers