Subscribing to a topic

Learn more about subscribing to a topic.

Subscribing to a topic

In order for the consumer to be able to consume messages, it first needs to subscribe to a topic. This can be done using a subscribe method:

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);
consumer.Subscribe(new List<string>() { topic });

Here you specify a list of topics that you want to consume from. In Java a 'rebalance listener' is also specified. Rebalancing is an important part of the consumer's life. Whenever the cluster or the consumers’ state changes, a rebalance will be issued. This will ensure that all the partitions are assigned to a consumer.

Polling

After subscribing to a topic, the consumer has to poll to see if there are new records.
The poll () method returns multiple records that can be processed by the client. After processing the records the client commits offsets synchronously, thus waiting until processing completes before continuing to poll.
while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}
The Consume() method returns a single result which can either be a single Kafka message, or an end of partition event if the IsPartitionEOF property is true.
while (true)
{
    var result = consumer.Consume();
    // Handle result
}

Saving progress

The last important point is to save the progress.

In Java this can be done with by the commitSync() and commitAsync() methods respectively. Auto commit is not recommended; manual commit is appropriate in the majority of use cases.

In C# this can be done by calling the Close() method at the end of message processing which automatically commits offsets. Alternatively, this can also be done manually by calling the Commit() method:

commitSync()
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            // commit and wait until the offset is committed
            kafkaConsumer.commitSync();
        }
commitAsync()
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received Message with topic =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }

            // Commit the offset and proceed with execution. The callback will be invoked when the offset
            // commit's result comes back from the broker.
            kafkaConsumer.commitAsync((offsets, exception) -> {
                if (exception != null) {
                    // handle the error that happened during offset commit
                } else {
                    // do something on successful offset commit if needed
                }
            });
        }
Close()
consumer.Close();  // commit offset and unsubscribe
Commit()
consumer.Commit(); // commit offset