Consuming data from Kafka topics using stored schemas

You can consume data from the machine-data-avro topic where you have produced data in Avro format. You need to create a copy of the consumer properties template file, replace the values of some properties in that file, and run the consumer application.

While the producer is running on the original terminal, open a new terminal and start consuming data from the machine-data-avro topic.

You must have produced data to a Kafka topic in Avro format.
  1. On the second terminal, ensure that you are on the same directory you used before.
    cd cdp-examples/kafka-client-avro
  2. Create a copy of the consumer properties template file.
    cp src/main/resources/consumer.properties.template consumer.properties
  3. Edit the consumer.properties file and replace the following placeholders with the respective values:
    • [***MACHINE_USER_NAME***]

      The Machine User name (prefixed with srv_). Note that you need to replace the machine username at two locations in the template.

    • [***MACHINE_USER_PASSWORD***]

      The Machine User’s Workload Password. Note that you need to replace the machine user password at two locations in the template.

    • [***BROKER1***], [***BROKER2***], [***BROKER3***]

      The hostnames of three cluster brokers. Also, ensure that the broker port numbers match the numbers seen on the Brokers page in SMM.

    • [***TRUSTSTORE_PATH***]

      The path of the truststore.jks file created in the previous section.

    • [***SCHEMA_REGISTRY_ENDPOINT***]

      The Schema Registry endpoint discovered in a previous section.

    The consumer application (MachineDataConsumer class) takes one argument which is the consumer.properties file.

    Unlike the producer application, the consumer does not require a schema to be provided. Because the schema is already registered in Schema Registry by the producer, the consumer can retrieve the correct schema from the registry.

    Each message send to Kafka by the producer has a few additional bytes that contain the reference to the correct schema version to be used from Schema Registry. These bytes are either prefixed to the message payload or, optionally, stored in the message header.

  4. Run the consumer application with the following command:
    java \
      -cp ./target/kafka-client-avro-1.0-SNAPSHOT.jar \
      com.cloudera.examples.MachineDataConsumer \
      ./consumer.properties

    If the consumer is working correctly, you should see several messages similar to the following:

    ... INFO  c.c.examples.MachineDataConsumer - Consumed 1 records
    ... INFO  c.c.examples.MachineDataConsumer - Received message: (null, {"timestamp": ..., }) at partition [machine-data-avro-4], offset [407], with headers: [RecordHeader(key = value.schema.version.id, value = [3, 0, 0, 0, 1])]
  5. Go to Streams Messaging Manager (SMM) and verify that now you can see a consumer reading from the topic, besides the producer you had seen before.
  6. Stop the producer and consumer.