Integrating Kafka and Schema Registry
About This Task
If you are running CDP without NiFi, integrate your Kafka Producer and Consumer manually. To do this you must add a dependency on the Schema Registry Serdes, and update the Kafka Producer and Kafka Consumer configuration files.
Steps to Add a Schema Registry Serdes Dependency
- Add the following text to
schema-registry-serdes:
<dependency> <groupId>com.hortonworks.registries</groupId> <artifactId>schema-registry-serdes</artifactId> </dependency>
Steps to Integrate the Kafka Producer
- Add the following text to the Kafka Producer
configuration:
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL))); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
- Edit the above text with values for the following properties:
-
schema.registry.url
-
key.serializer
-
value.serializer
-
Steps to Integrate the Kafka Consumer
- Add the following text to the Kafka Consumer
configuration:
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL))); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
- Edit the above text with values for the following properties:
-
schema.registry.url
-
key.deserializer
-
value.deserializer
-
Steps to Customize Client Cache Duration
By default, Schema Registry clients are only able to cache the schema for 5 minutes. To customize the cache duration, configure the cache properties.
- Customize the values for the following properties:
- Schema Metadata Cache:
schema.registry.client.schema.metadata.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in schema metadata cache. Default is 300 seconds.
- Schema Version Cache:
schema.registry.client.schema.version.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in schema version cache. Default is 300 seconds.
- Schema Text
Cache:
schema.registry.client.schema.text.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in schema text cache. Default is 300 seconds.
- Class Loader
Cache
schema.registry.client.class.loader.cache.expiry.interval.secs
Expiry interval (in seconds) of an entry in the serializer/deserializer class loader cache. Default is 300 seconds.
- Schema Metadata Cache: