You can construct the schema deserialization with the ClouderaRegistryKafkaDeserializationSchema.builder(..) object for FlinkKafkaProducer to read the messages in the same schema from the FlinkKafkaProducer. You must set the class or schema of the input messages and the RegistryAddress parameter in the object.

The deserialization schema can be constructed using the ClouderaRegistryKafkaDeserializationSchema.builder(..) object.

When reading messages (and keys), you always have to specify the expected Class<T> or record Schema of the input records. This way Flink can do any necessary conversion between the raw data received from Kafka and the expected output of the deserialization.

Required settings:
  • Class or Schema of the input messages depending on the data type
  • RegistryAddress parameter on the builder to establish the connection
Optinal settings:
  • Arbitrary SchemaRegistry client configuration using the setConfig method
  • Key configuration for the consumed Kafka messages (only to be specified if reading the keys into a key or value stream is necessary)
  • Security configuration
KafkaDeserializationSchema<ItemTransaction> schema = ClouderaRegistryKafkaDeserializationSchema
FlinkKafkaConsumer<ItemTransaction> transactionSource = new FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);