Integrating Kafka and Schema Registry

About This Task

If you are running HDF without NiFi, integrate your Kafka Producer and Consumer manually. To do this you must add a dependency on the Schema Registry Serdes, and update the Kafka Producer and Kafka Consumer configuration files.

Steps to Add a Schema Registry Serdes Dependency

  1. Add the following text to schema-registry-serdes:
    <dependency>
          <groupId>com.hortonworks.registries</groupId>
          <artifactId>schema-registry-serdes</artifactId>
    </dependency>

Steps to Integrate the Kafka Producer

  1. Add the following text to the Kafka Producer configuration:
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    
  2. Edit the above text with values for the following properties:
    • schema.registry.url

    • key.serializer

    • value.serializer

Steps to Integrate the Kafka Consumer

  1. Add the following text to the Kafka Consumer configuration:
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL)));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
  2. Edit the above text with values for the following properties:
    • schema.registry.url

    • key.deserializer

    • value.deserializer