Integrating Schema Registry with Kafka

Learn the manual configuration steps required to integrate Kafka and Schema Registry if you are running CDP without NiFi.

If you are running CDP without NiFi, integrate your Kafka producer and consumer manually. To do this, add a dependency on the Schema Registry SerDes and update the Kafka producer and Kafka consumer configuration files.

Adding a Schema Registry SerDes dependency

Add the following text to schema-registry-serdes:
<dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-serdes</artifactId>
</dependency>

Integrating the Kafka producer

  1. Add the following text to the Kafka producer configuration:
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    
  2. Edit the above text with values for the following properties:
    • schema.registry.url
    • key.serializer
    • value.serializer

Integrating the Kafka consumer

  1. Add the following text to the Kafka consumer configuration:
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
  2. Edit the above text with values for the following properties:
    • schema.registry.url
    • key.deserializer
    • value.deserializer

Customizing client cache duration

By default, Schema Registry clients are only able to cache the schema for 5 minutes. To customize the cache duration, configure the cache properties.

Customize the values for the following properties:
  • Schema Metadata Cache
    schema.registry.client.schema.metadata.cache.expiry.interval.secs

    Expiry interval (in seconds) of an entry in schema metadata cache. Default value is 300.

  • Schema Version Cache

    schema.registry.client.schema.version.cache.expiry.interval.secs

    Expiry interval (in seconds) of an entry in schema version cache. Default value is 300.

  • Schema Text Cache

    schema.registry.client.schema.text.cache.expiry.interval.secs

    Expiry interval (in seconds) of an entry in schema text cache. Default value is 300.

  • Class Loader Cache

    schema.registry.client.class.loader.cache.expiry.interval.secs

    Expiry interval (in seconds) of an entry in the serializer/deserializer class loader cache. Default value is 300.