Integrating Schema Registry with Kafka
Learn the manual configuration steps required to integrate Kafka and Schema Registry if you are running CDP without NiFi.
If you are running CDP without NiFi, integrate your Kafka producer and consumer manually. To do this, add a dependency on the Schema Registry SerDes and update the Kafka producer and Kafka consumer configuration files.
Adding a Schema Registry SerDes dependency
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
</dependency>
Integrating the Kafka producer
- Add the following text to the Kafka producer
configuration:
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL))); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
- Edit the above text with values for the following properties:
-
schema.registry.url
-
key.serializer
-
value.serializer
-
Integrating the Kafka consumer
- Add the following text to the Kafka consumer
configuration:
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL))); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
- Edit the above text with values for the following properties:
-
schema.registry.url
-
key.deserializer
-
value.deserializer
-
Customizing client cache duration
By default, Schema Registry clients are only able to cache the schema for 5 minutes. To customize the cache duration, configure the cache properties.
-
Schema Metadata Cache
schema.registry.client.schema.metadata.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in schema metadata cache. Default value is
300
. -
Schema Version Cache
schema.registry.client.schema.version.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in schema version cache. Default value is
300
. -
Schema Text Cache
schema.registry.client.schema.text.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in schema text cache. Default value is
300
. -
Class Loader Cache
schema.registry.client.class.loader.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in the serializer/deserializer class loader cache. Default value is
300
.