Reconfiguring the Kafka consumer

After you update a schema stored in the Schema Registry, you must reconfigure the consumer to use the new schema.

Because the latest schema version is backward compatible, it can be used to deserialize messages produced with the older versions of the schema. Hence, if you first reconfigure the consumer to use the new version of the schema, it can continue handling messages serialized with the old schema. Furthermore, it can also handle the new message format as soon as the producer starts sending them. By reconfiguring the consumers first, you avoid incompatibility between producers and consumers.

Perform the following steps to get the consumer to use the new schema:

  1. Navigate to Management Console > Environments, and select the environment where your Kafka cluster is running.
  2. On the Data Hubs tab of your environment, select the Kafka cluster you created.
  3. Click Schema Registry on the Services pane to open the Schema Registry web UI.
  4. Click on the machine-data-avro schema to expand it, and then click the pencil icon to edit the schema.
  5. In the DESCRIPTION field enter a description for this modification: Add os_type field.
  6. In the SCHEMA TEXT field, click CLEAR.
    This displays the BROWSE button.
  7. Click BROWSE and select the schema definition file for the new version of the schema, located at cdp-examples/kafka-client-avro/src/main/avro/MachineData.v2.avsc, and then click SAVE.
    Schema Registry performs compatibility validations on the new schema and if the outcome is positive, it saves the schema and displays a success message: Schema validated successfully.
    You can view both versions of the schema in Schema Registry.

    Now that you have the new version of the schema stored in Schema Registry, you need to reconfigure the consumer to use it.

  8. If the consumer application is still running, stop it.
  9. Edit the file, uncomment the following line and ensure it refers to the correct version number for the schema, as shown in the previous screenshot.
    schemaregistry.reader.schema.versions={"machine-data-avro": 2}

    This property is a JSON representation of a Map, associating a topic name to the version of the schema that the consumer should use.

  10. Start the consumer on one terminal window with the same command used earlier:
    java \
      -cp ./target/kafka-client-avro-1.0-SNAPSHOT.jar \
      com.cloudera.examples.MachineDataConsumer \
    The consumer displays messages which includes the new field, as shown in the following example:
    ... INFO  c.c.examples.MachineDataConsumer - Received message: (null, {..., "os_type": "UNKNOWN", ...}) at partition ...