Developing Apache Kafka ApplicationsPDF version

Kafka consumers

Learn more about Kafka consumers.

Kafka consumers are the subscribers responsible for reading records from one or more topics and one or more partitions of a topic. Consumers subscribing to a topic can happen manually or automatically; typically, this means writing a program using the KafkaConsumer API.

To instantiate a consumer:

KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(consumerConfig);

The KafkaConsumer class has two generic type parameters. Just as producers can send data (the values) with keys, the consumer can read data by keys. In this example both the keys and values are strings. If you define different types, you need to define a deserializer to accommodate the alternate types. For deserializers you need to implement the org.apache.kafka.common.serialization.Deserializer interface.

The most important configuration parameters that we need to specify are:

  • bootstrap.servers: A list of brokers to initially connect to. List 2 to 3 brokers; you don't needed to list the full cluster.
  • group.id: Every consumer belongs to a group. That way they’ll share the partitions of a topic.

    Need to make this better by describing how 'groups' relate to partitions.

  • key.deserializer/value.deserializer: Specify how to convert the Java representation to a sequence of bytes to send data through the Kafka protocol.

In addition to the configuration properties presented above, there are a number of other important configurations that any user of Kafka must know about. These are:

  • heartbeat.interval.ms: The interval of the heartbeats. For example, if the heartbeat interval is set to 3 seconds, the consumer sends a short heartbeat message to the broker every 3 seconds to indicate that it is alive.
  • session.timeout.ms: The consumer tells this timeout to the coordinator. This is used to control the heartbeats and remove the dead consumers. If it’s set to 10 seconds, the consumer can miss sending 2 heartbeats, assuming the previous heartbeat setting. If we increase the timeout, the consumer has more room for delays but the broker notices lagging consumers later.
  • max.poll.interval.ms: It is a very important detail: the consumers must maintain polling and should never do long-running processing. If a consumer is taking too much time between two polls, it will be detached from the consumer group. We can tune this configuration according to our needs. Note that if a consumer is stuck in processing, it will be noticed later if the value is increased.
  • request.timeout.ms: Generally every request has a timeout. This is an upper bound that the client waits for the server’s response. If this timeout elapses, then retries might happen if the number of retries are not exhausted.