Subscribing to a topic

Learn more about subscribing to a topic.

Subscribing to a topic using the subscribe() method call:

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

Here we specify a list of topics that we want to consume from and a 'rebalance listener.' Rebalancing is an important part of the consumer's life. Whenever the cluster or the consumers’ state changes, a rebalance will be issued. This will ensure that all the partitions are assigned to a consumer.

After subscribing to a topic, the consumer polls to see if there are new records:

while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

The poll returns multiple records that can be processed by the client. After processing the records the client commits offsets synchronously, thus waiting until processing completes before continuing to poll.

The last important point is to save the progress. This can be done by the commitSync() and commitAsync()methods respectively.

commitSync()
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            // commit and wait until the offset is committed
            kafkaConsumer.commitSync();
        }
commitAsync()
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }

            // Commit the offset and proceed with execution. The callback will be invoked when the offset
            // commit's result comes back from the broker.
            kafkaConsumer.commitAsync((offsets, exception) -> {
                if (exception != null) {
                    // handle the error that happened during offset commit
                } else {
                    // do something on successful offset commit if needed
                }
            });
        }

Auto commit is not recommended; manual commit is appropriate in the majority of use cases.