Schema Registry with Flink
When Kafka is chosen as source and sink for your application, you can use Cloudera Schema Registry to register and retrieve schema information of the different Kafka topics. You must add Schema Registry dependency to your project and add the appropriate schema object to your Kafka topics.
- Offers automatic and efficient serialization/deserialization for Avro, JSON and basic types
 - Guarantees that only compatible data can be written to a given topic (assuming that every producer uses the registry)
 - Supports safe schema evolution on both producer and consumer side
 - Offers visibility to developers on the data types and they can track schema evolution for the different Kafka topics
 
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cloudera-registry</artifactId>
    <version>${flink.version}</version>
</dependency>KafkaSource and
        KafkaSink using the appropriate schema:org.apache.flink.formats.registry.cloudera.avro.ClouderaRegistryAvroKafkaRecordSerializationSchema
org.apache.flink.formats.registry.cloudera.avro.ClouderaRegistryAvroKafkaDeserializationSchemaorg.apache.flink.formats.registry.cloudera.json.ClouderaRegistryJsonKafkaRecordSerializationSchema
org.apache.flink.formats.registry.cloudera.json.ClouderaRegistryJsonKafkaDeserializationSchemaSee the Apache Flink documentation for Kafka consumer and producer basics.
Supported types
- Avro Specific Record types
 - Avro Generic Records
 - JSON data types
 - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long, String, Boolean
 
To get started with Avro schemas and generated Java objects, see the Apache Avro documentation.
Security
Map that is passed to
        the Schema Registry
        configuration.Map<String, String> sslClientConfig = new HashMap<>();
sslClientConfig.put(K_TRUSTSTORE_PATH, params.get(K_SCHEMA_REG_SSL_CLIENT_KEY + "." + K_TRUSTSTORE_PATH));
sslClientConfig.put(K_TRUSTSTORE_PASSWORD, params.get(K_SCHEMA_REG_SSL_CLIENT_KEY + "." + K_TRUSTSTORE_PASSWORD));
Map<String, Object> schemaRegistryConf = new HashMap<>();
schemaRegistryConf.put(K_SCHEMA_REG_URL, params.get(K_SCHEMA_REG_URL));
schemaRegistryConf.put(K_SCHEMA_REG_SSL_CLIENT_KEY, sslClientConfig);RegistryClient property to the
          security.kerberos.login.contexts parameter in Cloudera
        Manager.security.kerberos.login.contexts=Client,KafkaClient,RegistryClientSchema serialization
You can construct the schema serialization with the
          ClouderaRegistryAvroKafkaRecordSerializationSchema and
          ClouderaRegistryJsonKafkaRecordSerializationSchema objects for
        FlinkKafkaProducer based on the required data format. You must set the topic configuration
        and RegistryAddress parameter in the object.
- Topic configuration when creating the builder, which can be static or dynamic (extracted from the data)
 RegistryAddressparameter on the builder to establish the connection
- Arbitrary 
SchemaRegistryclient configuration using thesetConfigmethod - Key configuration for the produced Kafka messages
- Specifying a 
KeySelectorfunction that extracts the key from each record - Using a Tuple2 stream for (key, value) pairs directly
 
 - Specifying a 
 - Security configuration
 
        KafkaSerializationSchema<ItemTransaction> schema = 
                ClouderaRegistryAvroKafkaRecordSerializationSchema.<ItemTransaction>builder(topic)
                         .setRegistryAddress(registryAddress)
                         .setKey(ItemTransaction::getItemId)
                         .build();
        KafkaSink<ItemTransaction> kafkaSink =
                KafkaSink.<Row>builder()
                        .setKafkaProducerConfig(kafkaProps)
                        .setRecordSerializer(schema)
                        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                        .build();        KafkaSerializationSchema<ItemTransaction> schema = 
                ClouderaRegistryJsonKafkaRecordSerializationSchema.<ItemTransaction>builder(topic)
                         .setRegistryAddress(registryAddress)
                         .setKey(ItemTransaction::getItemId)
                         .build();
        KafkaSink<ItemTransaction> kafkaSink =
                KafkaSink.<Row>builder()
                        .setKafkaProducerConfig(kafkaProps)
                        .setRecordSerializer(schema)
                        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                        .build();Schema deserialization
You can construct the schema deserialization with the
          ClouderaRegistryAvroKafkaRecordDeserializationSchema and
          ClouderaRegistryJsonKafkaRecordDeserializationSchema objects for
        FlinkKafkaProducer to read the messages in the same schema from the FlinkKafkaProducer. You
        must set the class or schema of the input messages and the RegistryAddress
        parameter in the object.
When reading messages (and keys), you always have to specify the expected Class<T> or record Schema of the input records. This way Flink can do any necessary conversion between the raw data received from Kafka and the expected output of the deserialization.
- Class or Schema of the input messages depending on the data type
 RegistryAddressparameter on the builder to establish the connection
- Arbitrary 
SchemaRegistryclient configuration using thesetConfigmethod - Key configuration for the consumed Kafka messages (only to be specified if reading the keys into a key or value stream is necessary)
 - Security configuration
 
        KafkaDeserializationSchema<ItemTransaction> schema = 
                        ClouderaRegistryAvroKafkaDeserializationSchema.builder(ItemTransaction.class)
                                .setRegistryAddress(registryAddress)
                                .build();
        KafkaSource<String> transactionSource = KafkaSource.<String>builder()
                .setBootstrapServers("<your_broker_url>")
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(schema)
                .build();        KafkaDeserializationSchema<ItemTransaction> schema = 
                        ClouderaRegistryJsonKafkaDeserializationSchema.builder(ItemTransaction.class)
                                .setRegistryAddress(registryAddress)
                                .build();
        KafkaSource<String> transactionSource = KafkaSource.<String>builder()
                .setBootstrapServers("<your_broker_url>")
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(schema)
                .build();