Developing Apache Kafka ApplicationsPDF version

Subscribing to a topic

Learn more about subscribing to a topic.

In order for the consumer to be able to consume messages, it first needs to subscribe to a topic. This can be done using a subscribe method:

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

Here you specify a list of topics that you want to consume from. In Java a 'rebalance listener' is also specified. Rebalancing is an important part of the consumer's life. Whenever the cluster or the consumers’ state changes, a rebalance will be issued. This will ensure that all the partitions are assigned to a consumer.

After subscribing to a topic, the consumer has to poll to see if there are new records.
The poll () method returns multiple records that can be processed by the client. After processing the records the client commits offsets synchronously, thus waiting until processing completes before continuing to poll.
while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

The last important point is to save the progress.

In Java this can be done with by the commitSync() and commitAsync() methods respectively. Auto commit is not recommended; manual commit is appropriate in the majority of use cases.

In C# this can be done by calling the Close() method at the end of message processing which automatically commits offsets. Alternatively, this can also be done manually by calling the Commit() method:

commitSync()
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            // commit and wait until the offset is committed
            kafkaConsumer.commitSync();
        }
commitAsync()
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }

            // Commit the offset and proceed with execution. The callback will be invoked when the offset
            // commit's result comes back from the broker.
            kafkaConsumer.commitAsync((offsets, exception) -> {
                if (exception != null) {
                    // handle the error that happened during offset commit
                } else {
                    // do something on successful offset commit if needed
                }
            });
        }