Developing Apache Kafka ApplicationsPDF version

Recommendations for using the producer and consumer APIs

A collection of recommendations regarding the use of the producer and consumer APIs.

After reviewing the basic examples of a producer and consumer, prototyping your own designs shouldn’t be too difficult. However, your code will likely undergo several iterations that improve on scalability, debuggability, robustness, and maintainability.

This topic presents recommendations in the form of code snippets that illustrate some of the important ways to use the producer and consumer APIs.

In addition to the recommendations presented here, it is highly recommended that you also review the Javadoc for producers and consumers which have additional details about Kafka client programming.

These Javadoc pages are quite dense with information. They assume you have sufficient background in reliable computing, networking, multithreading, and distributed systems to use the APIs correctly. While the following recommendations point out many caveats in using the client APIs, the Javadoc (and ultimately the source code) provides a more detailed explanation.

In these examples, the consumer constructor should be called once and the poll() method called within a loop. If this object is not reused, then a new connection to the broker is opened with each new KafkaConsumer object created.

Recommended

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
}

Not Recommended

while (true) {
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  ConsumerRecords<String, String> records = consumer.poll(100);
}

Similarly, it is recommended that you use one KafkaConsumer and/or KafkaProducer object per thread. Creating more objects opens multiple ports per broker connection. Overusing ephemeral ports can cause performance issues.

In addition, Cloudera recommends to set and use a fixed client.id for producers and consumers when they are connecting to the brokers. If this is not done, Kafka will assign a new client id every time a new connection is established, which can severely increase resource utilization (memory) on the broker side.

As explained in the Apache Kafka documentation topic New Consumer Configs, any consumer connected to a partition will time out if poll() is not called within the period defined by max.poll.interval.ms.

In the example below, the call to myDataProcess.doStuff(records) can cause poll() to be called infrequently. This could be due to a combination of reasons:

  • Being a blocking method call.
  • Doing work on a remote machine.
  • Having highly variable processing time.
  • Saving to storage that has highly variable I/O throughput.

In such cases, consider having another thread or process doing the actual work and making the handoff as lightweight as possible.

Example: poll() gets KafkaException due to session timeout

while (true) {
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  ConsumerRecords<String, String> records = consumer.poll(100);
  // the call below should return quickly in all cases
  myDataProcess.doStuff(records);
}

From the poll() Javadoc page, you can see that the poll() method throws several exceptions. If the catch statements (bold in the example) are not complete, then any uncaught exception will end up in the finally statement calling KafkaConsumer#close(). This will not be the desired behavior in many cases.

while (true) {
  try {
    ConsumerRecords<String, String> records = consumer.poll(100);
  } catch (Exception e) {
    e.printStackTrace();
  } finally {
    consumer.close();
  }

The interface org.apache.kafka.clients.producer.Callback (Javadoc) is used to define a class that can be used upon completion of a KafkaProducer#send() call. It allows for tracking, clean up, or other administrative code to be called. An example of unintended usage is to call KafkaProducer#send() within the Callback#onCompletion() method, essentially mimicking a retry. Because the onCompletion() method is always expected to return cleanly and the send() method makes no such guarantees, calling send() within the callback could end up hanging the code in case of network or broker issues.

The documentation for the latest upstream release of Apache Kafka indicates if there have been any changes to how the APIs are used (setup, read, write). Reviewing the latest information could help avoid upgrade-related changes to your producer or consumer.

Some examples from past versions include:

Old Class or Package New Class or Package
kafka.producer.ProducerConfig java.util.Properties
kafka.javaapi.* kafka.api.*
kafka.producer.KeyedMessage kafka.clients.producer.ProducerRecord

Network dependency is one of the more subtle issues. Given the consumer dependencies on Ranger and Zookeeper, having a combination of frequent or prolonged DNS or network outages can also cause various session timeouts to occur. Such timeouts will force partition rebalancing on the brokers, which will worsen general Kafka reliability.

Should these issues be common in your network, you may need to have a less straightforward design that can handle such reliability issues outside of the Kafka client.