SQL and Table APIPDF version

Schema Registry catalog

The Schema Registry catalog allows you to access Kafka topics with registered schemas as Flink SQL tables. You can add Schema Registry as a catalog in Flink SQL by adding the dependency to your project, registering it in Java, and enabling it in the custom environment file.

Each Kafka topic will be mapped to a table with TableSchema that matches the Avro schema.

Maven Dependency
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-avro-cloudera-registry</artifactId>
   <version>1.10.0-csa1.2.0.0</version>
</dependency>
The following example shows how to register and use the Schema Registry catalog from Java:
SchemaRegistryClient client = new SchemaRegistryClient(
	ImmutableMap.of(
		SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(),		      
"http://<your_hostname>:7788/api/v1"
			   )
	);

Map<String, String> connectorProps = new Kafka()
		.property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                      "<your_hostname>:9092")
		.startFromEarliest()
		.toProperties();

tableEnv.registerCatalog(
   "registry", new ClouderaRegistryCatalog("registry", client, connectorProps)
 );
tableEnv.useCatalog("registry");
To use the Schema Registry catalog from the SQL client, you have to add it in the YAML configuration file to the catalogs section:
  - name: registry
    type: cloudera-registry
    # Registry Client standard properties
    registry.properties.schema.registry.url: <registry_url>:<port>/api/v1
    registry.properties.key: …
    # Registry Client SSL properties
    registry.client.ssl.keyStorePath: ... 
    registry.client.ssl.sslProp: ... 
    # Kafka Connector properties
    connector.properties.bootstrap.servers: <kafka_brokers>
    connector.startup-mode: earliest-offset
Required catalog configurations:
  • registry.properties.schema.registry.url
  • connector.properties.bootstrap.servers