Schema Registry catalog
The Schema Registry catalog allows you to access Kafka topics with registered schemas as Flink SQL tables. You can add Schema Registry as a catalog in Flink SQL by adding the dependency to your project, registering it in Java, and enabling it in the custom environment file.
Each Kafka topic will be mapped to a table with TableSchema
that matches the
Avro schema.
Maven
Dependency
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro-cloudera-registry</artifactId> <version>1.10.0-csa1.2.0.0</version> </dependency>
The following example shows how to register and use the Schema Registry catalog from
Java:
SchemaRegistryClient client = new SchemaRegistryClient( ImmutableMap.of( SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<your_hostname>:7788/api/v1" ) ); Map<String, String> connectorProps = new Kafka() .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<your_hostname>:9092") .startFromEarliest() .toProperties(); tableEnv.registerCatalog( "registry", new ClouderaRegistryCatalog("registry", client, connectorProps) ); tableEnv.useCatalog("registry");
To use the Schema Registry catalog from the SQL client, you have to add it in the YAML
configuration file to the catalogs
section:
- name: registry type: cloudera-registry # Registry Client standard properties registry.properties.schema.registry.url: <registry_url>:<port>/api/v1 registry.properties.key: … # Registry Client SSL properties registry.client.ssl.keyStorePath: ... registry.client.ssl.sslProp: ... # Kafka Connector properties connector.properties.bootstrap.servers: <kafka_brokers> connector.startup-mode: earliest-offset
Required catalog configurations:
registry.properties.schema.registry.url
connector.properties.bootstrap.servers