Recommendations for using the producer and consumer APIs
A collection of recommendations regarding the use of the producer and consumer APIs.
After reviewing the basic examples of a producer and consumer, prototyping your own designs shouldn’t be too difficult. However, your code will likely undergo several iterations that improve on scalability, debuggability, robustness, and maintainability.
This topic presents recommendations in the form of code snippets that illustrate some of the important ways to use the producer and consumer APIs.
In addition to the recommendations presented here, it is highly recommended that you also review the Javadoc for producers and consumers which have additional details about Kafka client programming.
These Javadoc pages are quite dense with information. They assume you have sufficient background in reliable computing, networking, multithreading, and distributed systems to use the APIs correctly. While the following recommendations point out many caveats in using the client APIs, the Javadoc (and ultimately the source code) provides a more detailed explanation.
Reuse your Producer/Consumer object
In these examples, the consumer constructor should be called once and the
poll()
method called within a loop. If this object is not reused, then a
new connection to the broker is opened with each new KafkaConsumer
object
created.
Recommended
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
}
Not Recommended
while (true) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
ConsumerRecords<String, String> records = consumer.poll(100);
}
Similarly, it is recommended that you use one KafkaConsumer
and/or
KafkaProducer
object per thread. Creating more objects opens multiple
ports per broker connection. Overusing ephemeral ports can cause performance issues.
In addition, Cloudera recommends to set and use a fixed client.id
for
producers and consumers when they are connecting to the brokers. If this is not done, Kafka
will assign a new client id every time a new connection is established, which can severely
increase resource utilization (memory) on the broker side.
Each KafkaConsumer object requires calling poll() frequently
As explained in the Apache Kafka documentation topic New Consumer Configs, any consumer connected to a partition will
time out if poll()
is not called within the period defined by
max.poll.interval.ms
.
In the example below, the call to myDataProcess.doStuff(records)
can cause
poll()
to be called infrequently. This could be due to a combination of
reasons:
- Being a blocking method call.
- Doing work on a remote machine.
- Having highly variable processing time.
- Saving to storage that has highly variable I/O throughput.
In such cases, consider having another thread or process doing the actual work and making the handoff as lightweight as possible.
Example: poll()
gets KafkaException due to session timeout
while (true) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
ConsumerRecords<String, String> records = consumer.poll(100);
// the call below should return quickly in all cases
myDataProcess.doStuff(records);
}
Catch all exceptions from poll()
From the poll()
Javadoc page, you can see that the
poll()
method throws several exceptions. If the catch statements
(bold
in the example) are not complete, then any uncaught
exception will end up in the finally statement calling
KafkaConsumer#close()
. This will not be the desired behavior in many
cases.
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
Callback#onCompletion() should always exit without errors
The interface org.apache.kafka.clients.producer.Callback
(Javadoc) is used to define a class that can be used
upon completion of a KafkaProducer#send()
call. It allows for tracking,
clean up, or other administrative code to be called. An example of unintended usage is to
call KafkaProducer#send()
within the
Callback#onCompletion()
method, essentially mimicking a retry. Because
the onCompletion()
method is always expected to return cleanly and the
send()
method makes no such guarantees, calling send()
within the callback could end up hanging the code in case of network or broker issues.
Check your API usage against the latest API
The documentation for the latest upstream release of Apache Kafka indicates if there have
been any changes to how the APIs are used (setup
, read
,
write
). Reviewing the latest information could help avoid upgrade-related
changes to your producer or consumer.
Some examples from past versions include:
Old Class or Package | New Class or Package |
---|---|
kafka.producer.ProducerConfig |
java.util.Properties |
kafka.javaapi .* |
kafka.api .* |
kafka.producer.KeyedMessage |
kafka.clients.producer.ProducerRecord |
Hidden Dependency on Network Availability
Network dependency is one of the more subtle issues. Given the consumer dependencies on Ranger and Zookeeper, having a combination of frequent or prolonged DNS or network outages can also cause various session timeouts to occur. Such timeouts will force partition rebalancing on the brokers, which will worsen general Kafka reliability.
Should these issues be common in your network, you may need to have a less straightforward design that can handle such reliability issues outside of the Kafka client.