Schema Registry formats
You can avoid defining the Avro and JSON schema for Kafka table sources and sinks, when the schema is stored in Cloudera Schema Registry.
Such topics are accessible through automatically generated tables from the read-only registry catalog.
Maven
dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cloudera-registry</artifactId>
<version>1.0-csa1.12.0.0</version>
</dependency>
If you need to define a table outside the registry catalog, the following example can be
used:
CREATE TABLE source_table (
id BIGINT,
name STRING,
description STRING
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'message',
'connector.startup-mode' = 'latest-offset',
'connector.properties.bootstrap.servers' = '<hostname>:<port>',
'connector.properties.group.id' = 'test',
'format.type' = 'registry',
'format.registry.properties.schema.registry.url' = 'http(s)://<hostname>:<port>/api/v1'
)
Cloudera Schema Registry connector for Flink stores the schema version info in the Kafka
messages by default. This means that the
format.registry.properties.store.schema.version.id.in.header
property is
set to false
by default.
The schema name in the registry is usually the same as the Kafka topic name, but can be
overridden by the format.registry.schema-name
property.