Schema Registry with Flink
When Kafka is chosen as source and sink for your application, you can use Cloudera Schema Registry to register and retrieve schema information of the different Kafka topics. You must add Schema Registry dependency to your project and add the appropriate schema object to your Kafka topics.
There are several reasons why you should prefer the Schema Registry instead of custom
serializer implementations on both consumer and producer side:
- Offers automatic and efficient serialization/deserialization for avro and basic types (+ JSON in the future)
- Guarantees that only compatible data can be written to a given topic (assuming that every producer uses the registry)
- Supports safe schema evolution on both producer and consumer side
- Offers visibility to developers on the data types and they can track schema evolution for the different Kafka topics
Add the following Maven dependency or equivalent to use the schema registry integration in your
project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>${flink.version}</version>
</dependency>
The schema registry can be plugged directly into the
FlinkKafkaConsumer
and
FlinkKafkaProducer
using the appropriate schema:org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema
org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaSerializationSchema
See the Apache Flink documentation for Kafka consumer and producer basics.
Supported types
Currently, the following data types are supported for producers and consumers:
- Avro Specific Record types
- Avro Generic Records
- Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long, String, Boolean
To get started with Avro schemas and generated Java objects, see the Apache Avro documentation.
Security
You need to include every SSL configuration into a
Map
that is passed to
the Schema Registry
configuration.Map<String, String> sslClientConfig = new HashMap<>();
sslClientConfig.put(K_TRUSTSTORE_PATH, params.get(K_SCHEMA_REG_SSL_CLIENT_KEY + "." + K_TRUSTSTORE_PATH));
sslClientConfig.put(K_TRUSTSTORE_PASSWORD, params.get(K_SCHEMA_REG_SSL_CLIENT_KEY + "." + K_TRUSTSTORE_PASSWORD));
Map<String, Object> schemaRegistryConf = new HashMap<>();
schemaRegistryConf.put(K_SCHEMA_REG_URL, params.get(K_SCHEMA_REG_URL));
schemaRegistryConf.put(K_SCHEMA_REG_SSL_CLIENT_KEY, sslClientConfig);
For Kerberos authentication, Flink can maintain the authentication and ticket renewal
automatically. You can define an additional
RegistryClient
property to the
security.kerberos.login.contexts
parameter in Cloudera
Manager.security.kerberos.login.contexts=Client,KafkaClient,RegistryClient