The deserialization schema can be constructed using the ClouderaRegistryKafkaDeserializationSchema.builder(..) object.

When reading messages (and keys), you always have to specify the expected Class<T> or record Schema of the input records. This way Flink can do any necessary conversion between the raw data received from Kafka and the expected output of the deserialization.

Required settings:
  • Class or Schema of the input messages depending on the data type
  • RegistryAddress parameter on the builder to establish the connection
Optinal settings:
  • Arbitrary SchemaRegistry client configuration using the setConfig method
  • Key configuration for the consumed Kafka messages (only to be specified if reading the keys into a key or value stream is necessary)
  • Security configuration
KafkaDeserializationSchema<ItemTransaction> schema = ClouderaRegistryKafkaDeserializationSchema
FlinkKafkaConsumer<ItemTransaction> transactionSource = new FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);