Consuming data from Kafka topics using stored schemas
You can consume data from the machine-data-avro
topic where you have
produced data in Avro format. You need to create a copy of the consumer properties template
file, replace the values of some properties in that file, and run the consumer
application.
While the producer is running on the original terminal, open a new terminal and start
consuming data from the machine-data-avro
topic.
-
On the second terminal, ensure that you are on the same directory you used
before.
cd cdp-examples/kafka-client-avro
-
Create a copy of the consumer properties template file.
cp src/main/resources/consumer.properties.template consumer.properties
-
Edit the
consumer.properties
file and replace the following placeholders with the respective values:-
[***MACHINE_USER_NAME***]
The Machine User name (prefixed with
srv_
). Note that you need to replace the machine username at two locations in the template. -
[***MACHINE_USER_PASSWORD***]
The Machine User’s Workload Password. Note that you need to replace the machine user password at two locations in the template.
-
[***BROKER1***], [***BROKER2***], [***BROKER3***]
The hostnames of three cluster brokers. Also, ensure that the broker port numbers match the numbers seen on the Brokers page in SMM.
-
[***TRUSTSTORE_PATH***]
The path of the
truststore.jks
file created in the previous section. -
[***SCHEMA_REGISTRY_ENDPOINT***]
The Schema Registry endpoint discovered in a previous section.
The consumer application (
MachineDataConsumer
class) takes one argument which is theconsumer.properties
file.Unlike the producer application, the consumer does not require a schema to be provided. Because the schema is already registered in Schema Registry by the producer, the consumer can retrieve the correct schema from the registry.
Each message send to Kafka by the producer has a few additional bytes that contain the reference to the correct schema version to be used from Schema Registry. These bytes are either prefixed to the message payload or, optionally, stored in the message header.
-
-
Run the consumer application with the following command:
java \ -cp ./target/kafka-client-avro-1.0-SNAPSHOT.jar \ com.cloudera.examples.MachineDataConsumer \ ./consumer.properties
If the consumer is working correctly, you should see several messages similar to the following:
... INFO c.c.examples.MachineDataConsumer - Consumed 1 records ... INFO c.c.examples.MachineDataConsumer - Received message: (null, {"timestamp": ..., }) at partition [machine-data-avro-4], offset [407], with headers: [RecordHeader(key = value.schema.version.id, value = [3, 0, 0, 0, 1])]
- Go to Streams Messaging Manager (SMM) and verify that now you can see a consumer reading from the topic, besides the producer you had seen before.
- Stop the producer and consumer.