Subscribing to a topic
Learn more about subscribing to a topic.
Subscribing to a topic
In order for the consumer to be able to consume messages, it first needs to subscribe to a topic. This can be done using a subscribe method:
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);
consumer.Subscribe(new List<string>() { topic });
Here you specify a list of topics that you want to consume from. In Java a 'rebalance listener' is also specified. Rebalancing is an important part of the consumer's life. Whenever the cluster or the consumers’ state changes, a rebalance will be issued. This will ensure that all the partitions are assigned to a consumer.
Polling
- The poll () method returns multiple records that can be processed by the client.
After processing the records the client commits offsets synchronously, thus waiting
until processing completes before continuing to poll.
while (true) { data = kafkaConsumer.poll(); // do something with 'data' }
- The Consume() method returns a single result which can either be a single Kafka
message, or an end of partition event if the
IsPartitionEOF
property is true.while (true) { var result = consumer.Consume(); // Handle result }
Saving progress
The last important point is to save the progress.
In Java this can be done with by the commitSync() and commitAsync() methods respectively. Auto commit is not recommended; manual commit is appropriate in the majority of use cases.
In C# this can be done by calling the Close() method at the end of message processing which automatically commits offsets. Alternatively, this can also be done manually by calling the Commit() method:
- commitSync()
-
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // commit and wait until the offset is committed kafkaConsumer.commitSync(); }
- commitAsync()
-
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // Commit the offset and proceed with execution. The callback will be invoked when the offset // commit's result comes back from the broker. kafkaConsumer.commitAsync((offsets, exception) -> { if (exception != null) { // handle the error that happened during offset commit } else { // do something on successful offset commit if needed } }); }
- Close()
-
consumer.Close(); // commit offset and unsubscribe
- Commit()
-
consumer.Commit(); // commit offset