ClouderaRegistryKafkaSerializationSchema
You can construct the schema serialization with the ClouderaRegistryKafkaSerializationSchema.builder(..) object for FlinkKafkaProducer. You must set the topic configuration and RegistryAddress parameter in the object.
The serialization schema can be constructed using the
ClouderaRegistryKafkaSerializationSchema.builder(..)
object.
Required settings:
- Topic configuration when creating the builder, which can be static or dynamic (extracted from the data)
RegistryAddress
parameter on the builder to establish the connection
Optinal settings:
- Arbitrary
SchemaRegistry
client configuration using thesetConfig
method - Key configuration for the produced Kafka messages
- Specifying a
KeySelector
function that extracts the key from each record - Using a Tuple2 stream for (key, value) pairs directly
- Specifying a
- Security configuration
KafkaSerializationSchema<ItemTransaction> schema = ClouderaRegistryKafkaSerializationSchema
.<ItemTransaction>builder(topic)
.setRegistryAddress(registryAddress)
.setKey(ItemTransaction::getItemId)
.build();
FlinkKafkaProducer<ItemTransaction> kafkaSink = new FlinkKafkaProducer<>("dummy", schema, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);