Recommendations for using the producer and consumer APIs

Review the following collection of code snippets and recommendations regarding the use of the producer and consumer APIs to learn how you can develop better Kafka clients.

After reviewing the basic examples of a producer and consumer, prototyping your own designs shouldn’t be too difficult. However, your code will likely undergo several iterations that improve on scalability, debuggability, robustness, and maintainability.

This topic presents recommendations in the form of code snippets that illustrate some of the important ways to use the producer and consumer APIs.

In addition to the recommendations presented here, it is highly recommended that you also review the Javadoc for producers and consumers or any other API documentation available for the client library you are using. These will have additional details about Kafka client programming .

API documentation is known to be dense with information. They assume you have sufficient background in reliable computing, networking, multithreading, and distributed systems to use the APIs correctly. While the following recommendations point out many caveats in using the client APIs, the API documentation (and ultimately the source code) provides a more detailed explanation.

Reuse your Producer/Consumer object

In these examples, the consumer constructor should be called once and the poll method called within a loop. If this object is not reused, then a new connection to the broker is opened with each new consumer object created.

Recommended
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
}
Not Recommended
while (true) {
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  ConsumerRecords<String, String> records = consumer.poll(100);
}
Recommended
var consumer = new ConsumerBuilder<string, string>(config).Build();

while (true)
{
    var result = consumer.Consume();
}
Not recommended
while (true)
{
    var consumer = new ConsumerBuilder<string, string>(config).Build();
    var result = consumer.Consume();
}

Similarly, it is recommended that you use one consumer and/or producer object per thread. Creating more objects opens multiple ports per broker connection. Overusing ephemeral ports can cause performance issues.

In addition, Cloudera recommends to set and use a fixed client ID for producers and consumers when they are connecting to the brokers. If this is not done, Kafka will assign a new client id every time a new connection is established, which can severely increase resource utilization (memory) on the broker side.

Each KafkaConsumer object requires frequent polling

Any consumer connected to a partition will time out if no polling is done before the maximum poll interval is reached.

In the example below, the call to myDataProcess.doStuff(records) can cause infrequent polling. This could be due to a combination of reasons. This could be due to a combination of reasons:

  • Being a blocking method call.
  • Doing work on a remote machine.
  • Having highly variable processing time.
  • Saving to storage that has highly variable I/O throughput.

In such cases, consider having another thread or process doing the actual work and making the handoff as lightweight as possible.

Example: poll() gets KafkaException due to session timeout
while (true) {
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  ConsumerRecords<String, String> records = consumer.poll(100);
  // the call below should return quickly in all cases
  myDataProcess.doStuff(records);
}
Example: Consume() gets KafkaException due to session timeout
while (true) {
  var consumer = new ConsumerBuilder<string, string>(config).Build();
  var result = consumer.Consume(100);
  // the call below should return quickly in all cases
  myDataProcess.DoStuff(records);
}

Catch all exceptions when polling

From the poll() Javadoc page, you can see that the poll() method throws several exceptions. If the catch statements are not complete, then any uncaught exception will end up in the finally statement calling KafkaConsumer#close(). This will not be the desired behavior in many cases.

while (true) {
  try {
    ConsumerRecords<String, String> records = consumer.poll(100);
  } catch (Exception e) {
    e.printStackTrace();
  } finally {
    consumer.close();
  }
In the .NET client some overloads can only throw one, but another overload can throw multiple exceptions. Make sure that all exceptions are caught.
while (true) 
{
  try 
  {
    var result = consumer.Consume();
  }
  catch (Exception e)
  {
    Console.WriteLine($"Exception caught:\n {e.StackTrace}");
  }
  finally 
  {
    consumer.Close();
  }
}

Callback#onCompletion() should always exit without errors

The interface org.apache.kafka.clients.producer.Callback (Javadoc) is used to define a class that can be used upon completion of a KafkaProducer#send() call. It allows for tracking, clean up, or other administrative code to be called. An example of unintended usage is to call KafkaProducer#send() within the Callback#onCompletion() method, essentially mimicking a retry. Because the onCompletion() method is always expected to return cleanly and the send() method makes no such guarantees, calling send() within the callback could end up hanging the code in case of network or broker issues.

Check your API usage against the latest API

The documentation for the latest upstream release of Apache Kafka indicates if there have been any changes to how the APIs are used (setup, read, write). Reviewing the latest information could help avoid upgrade-related changes to your producer or consumer.

Some examples from past versions include:

Old Class or Package New Class or Package
kafka.producer.ProducerConfig java.util.Properties
kafka.javaapi.* kafka.api.*
kafka.producer.KeyedMessage kafka.clients.producer.ProducerRecord

Hidden Dependency on Network Availability

Network dependency is one of the more subtle issues. Given the consumer dependencies on Sentry and Zookeeper, having a combination of frequent or prolonged DNS or network outages can also cause various session timeouts to occur. Such timeouts will force partition rebalancing on the brokers, which will worsen general Kafka reliability.

Should these issues be common in your network, you may need to have a less straightforward design that can handle such reliability issues outside of the Kafka client.