Integrating with Schema RegistryPDF version

Integrating Kafka and Schema Registry

About This Task

If you are running HDF without NiFi, integrate your Kafka Producer and Consumer manually. To do this you must add a dependency on the Schema Registry Serdes, and update the Kafka Producer and Kafka Consumer configuration files.

Steps to Add a Schema Registry Serdes Dependency

  1. Add the following text to schema-registry-serdes:
    <dependency>
          <groupId>com.hortonworks.registries</groupId>
          <artifactId>schema-registry-serdes</artifactId>
    </dependency>

Steps to Integrate the Kafka Producer

  1. Add the following text to the Kafka Producer configuration:
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    
  2. Edit the above text with values for the following properties:
    • schema.registry.url

    • key.serializer

    • value.serializer

Steps to Integrate the Kafka Consumer

  1. Add the following text to the Kafka Consumer configuration:
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
  2. Edit the above text with values for the following properties:
    • schema.registry.url

    • key.deserializer

    • value.deserializer

Steps to Customize Client Cache Duration

By default, Schema Registry clients are only able to cache the schema for 5 minutes. To customize the cache duration, configure the cache properties.

  1. Customize the values for the following properties:
    • Schema Metadata Cache:
      schema.registry.client.schema.metadata.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in schema metadata cache. Default is 300 seconds.

    • Schema Version Cache:
      schema.registry.client.schema.version.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in schema version cache. Default is 300 seconds.

    • Schema Text Cache:
      schema.registry.client.schema.text.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in schema text cache. Default is 300 seconds.

    • Class Loader Cache
      schema.registry.client.class.loader.cache.expiry.interval.secs

      Expiry interval (in seconds) of an entry in the serializer/deserializer class loader cache. Default is 300 seconds.