Recommendations for using the producer and consumer APIs
Review the following collection of code snippets and recommendations regarding the use of the producer and consumer APIs to learn how you can develop better Kafka clients.
After reviewing the basic examples of a producer and consumer, prototyping your own designs shouldn’t be too difficult. However, your code will likely undergo several iterations that improve on scalability, debuggability, robustness, and maintainability.
This topic presents recommendations in the form of code snippets that illustrate some of the important ways to use the producer and consumer APIs.
In addition to the recommendations presented here, it is highly recommended that you also review the Javadoc for producers and consumers or any other API documentation available for the client library you are using. These will have additional details about Kafka client programming .
API documentation is known to be dense with information. They assume you have sufficient background in reliable computing, networking, multithreading, and distributed systems to use the APIs correctly. While the following recommendations point out many caveats in using the client APIs, the API documentation (and ultimately the source code) provides a more detailed explanation.
Reuse your Producer/Consumer object
In these examples, the consumer constructor should be called once and the poll method called within a loop. If this object is not reused, then a new connection to the broker is opened with each new consumer object created.
- Recommended
-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); }
- Not Recommended
-
while (true) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); ConsumerRecords<String, String> records = consumer.poll(100); }
- Recommended
-
var consumer = new ConsumerBuilder<string, string>(config).Build(); while (true) { var result = consumer.Consume(); }
- Not recommended
-
while (true) { var consumer = new ConsumerBuilder<string, string>(config).Build(); var result = consumer.Consume(); }
Similarly, it is recommended that you use one consumer and/or producer object per thread. Creating more objects opens multiple ports per broker connection. Overusing ephemeral ports can cause performance issues.
In addition, Cloudera recommends to set and use a fixed client ID for producers and consumers when they are connecting to the brokers. If this is not done, Kafka will assign a new client id every time a new connection is established, which can severely increase resource utilization (memory) on the broker side.
Each KafkaConsumer object requires frequent polling
Any consumer connected to a partition will time out if no polling is done before the maximum poll interval is reached.
In the example below, the call to myDataProcess.doStuff(records)
can cause
infrequent polling. This could be due to a combination of reasons. This could be due to a
combination of reasons:
- Being a blocking method call.
- Doing work on a remote machine.
- Having highly variable processing time.
- Saving to storage that has highly variable I/O throughput.
In such cases, consider having another thread or process doing the actual work and making the handoff as lightweight as possible.
- Example:
poll()
gets KafkaException due to session timeout -
while (true) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); ConsumerRecords<String, String> records = consumer.poll(100); // the call below should return quickly in all cases myDataProcess.doStuff(records); }
- Example:
Consume()
gets KafkaException due to session timeout -
while (true) { var consumer = new ConsumerBuilder<string, string>(config).Build(); var result = consumer.Consume(100); // the call below should return quickly in all cases myDataProcess.DoStuff(records); }
Catch all exceptions when polling
-
From the
poll()
Javadoc page, you can see that thepoll()
method throws several exceptions. If the catch statements are not complete, then any uncaught exception will end up in the finally statement callingKafkaConsumer#close()
. This will not be the desired behavior in many cases.while (true) { try { ConsumerRecords<String, String> records = consumer.poll(100); } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); }
- In the .NET client some overloads can only throw one, but another overload can throw
multiple exceptions. Make sure that all exceptions are
caught.
while (true) { try { var result = consumer.Consume(); } catch (Exception e) { Console.WriteLine($"Exception caught:\n {e.StackTrace}"); } finally { consumer.Close(); } }
Callback#onCompletion() should always exit without errors
org.apache.kafka.clients.producer.Callback
(Javadoc) is used to define a class that can be used
upon completion of a KafkaProducer#send()
call. It allows for tracking,
clean up, or other administrative code to be called. An example of unintended usage is to
call KafkaProducer#send()
within the
Callback#onCompletion()
method, essentially mimicking a retry. Because
the onCompletion()
method is always expected to return cleanly and the
send()
method makes no such guarantees, calling send()
within the callback could end up hanging the code in case of network or broker issues.Check your API usage against the latest API
The documentation for the latest upstream release of Apache Kafka indicates if there have
been any changes to how the APIs are used (setup
, read
,
write
). Reviewing the latest information could help avoid upgrade-related
changes to your producer or consumer.
Some examples from past versions include:
Old Class or Package | New Class or Package |
---|---|
kafka.producer.ProducerConfig |
java.util.Properties |
kafka.javaapi .* |
kafka.api .* |
kafka.producer.KeyedMessage |
kafka.clients.producer.ProducerRecord |
Hidden Dependency on Network Availability
Network dependency is one of the more subtle issues. Given the consumer dependencies on Ranger and Zookeeper, having a combination of frequent or prolonged DNS or network outages can also cause various session timeouts to occur. Such timeouts will force partition rebalancing on the brokers, which will worsen general Kafka reliability.
Should these issues be common in your network, you may need to have a less straightforward design that can handle such reliability issues outside of the Kafka client.