ClouderaRegistryKafkaSerializationSchema

You can construct the schema serialization with the ClouderaRegistryKafkaSerializationSchema.builder(..) object for FlinkKafkaProducer. You must set the topic configuration and RegistryAddress parameter in the object.

The serialization schema can be constructed using the ClouderaRegistryKafkaSerializationSchema.builder(..) object.

Required settings:
  • Topic configuration when creating the builder, which can be static or dynamic (extracted from the data)
  • RegistryAddress parameter on the builder to establish the connection
Optinal settings:
  • Arbitrary SchemaRegistry client configuration using the setConfig method
  • Key configuration for the produced Kafka messages
    • Specifying a KeySelector function that extracts the key from each record
    • Using a Tuple2 stream for (key, value) pairs directly
  • Security configuration
KafkaSerializationSchema<ItemTransaction> schema = ClouderaRegistryKafkaSerializationSchema
  .<ItemTransaction>builder(topic)
  .setRegistryAddress(registryAddress)
  .setKey(ItemTransaction::getItemId)
  .build();
FlinkKafkaProducer<ItemTransaction> kafkaSink = new FlinkKafkaProducer<>("dummy", schema, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);