Schema Registry formats

You can avoid defining the Avro and JSON schema for Kafka table sources and sinks, when the schema is stored in Cloudera Schema Registry.

Such topics are accessible through automatically generated tables from the read-only registry catalog.

Maven dependency
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-cloudera-registry</artifactId>
  <version>1.0-csa1.12.0.0</version>
</dependency>
If you need to define a table outside the registry catalog, the following example can be used:
CREATE TABLE source_table (
   id BIGINT,
   name STRING,
   description STRING
) WITH (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'message',
   'connector.startup-mode' = 'latest-offset',
   'connector.properties.bootstrap.servers' = '<hostname>:<port>',
   'connector.properties.group.id' = 'test',
   'format.type' = 'registry',
   'format.registry.properties.schema.registry.url' = 'http(s)://<hostname>:<port>/api/v1'
)

Cloudera Schema Registry connector for Flink stores the schema version info in the Kafka messages by default. This means that the format.registry.properties.store.schema.version.id.in.header property is set to false by default.

The schema name in the registry is usually the same as the Kafka topic name, but can be overridden by the format.registry.schema-name property.