Integrating Kafka and Schema Registry

If you are running CDP without NiFi, integrate your Kafka producer and consumer manually. To do this you must add a dependency on the Schema Registry Serdes, and update the Kafka producer and Kafka consumer configuration files.

Steps to Add a Schema Registry Serdes Dependency

  1. Add the following text to schema-registry-serdes:
    <dependency>
          <groupId>com.hortonworks.registries</groupId>
          <artifactId>schema-registry-serdes</artifactId>
    </dependency>

Steps to Integrate the Kafka Producer

  1. Add the following text to the Kafka Producer configuration:
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    
  2. Edit the above text with values for the following properties:
    • schema.registry.url

    • key.serializer

    • value.serializer

Steps to Integrate the Kafka Consumer

  1. Add the following text to the Kafka Consumer configuration:
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
  2. Edit the above text with values for the following properties:
    • schema.registry.url

    • key.deserializer

    • value.deserializer

Steps to Customize Client Cache Duration

By default, Schema Registry clients are only able to cache the schema for 5 minutes. To customize the cache duration, configure the cache properties.

  1. Customize the values for the following properties:
    • Schema Metadata Cache:
      schema.registry.client.schema.metadata.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in schema metadata cache. Default is 300 seconds.

    • Schema Version Cache:
      schema.registry.client.schema.version.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in schema version cache. Default is 300 seconds.

    • Schema Text Cache:
      schema.registry.client.schema.text.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in schema text cache. Default is 300 seconds.

    • Class Loader Cache
      schema.registry.client.class.loader.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in the serializer/deserializer class loader cache. Default is 300 seconds.