Kafka consumers
Learn more about Kafka consumers.
Kafka consumers are the subscribers responsible for reading records from one or more topics and one or more partitions of a topic. Consumers subscribing to a topic can happen manually or automatically; typically, this means writing a program using the consumer API available in your chosen client library.
To instantiate a consumer:
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(consumerConfig);
var consumer = new ConsumerBuilder<string, string>(config).Build();
The consumer class has two generic type parameters. Just as producers can send data (the
values) with keys, the consumer can read data by keys. In this example both the keys and
values are strings. If you define different types, you need to define a deserializer to
accommodate the alternate types. For deserializers you need to implement an interface:
- For Java implement the
org.apache.kafka.common.serialization.Deserializer
interface - For C# implement the
Confluent.Kafka.IDeserializer
interface
The most important configuration parameters that we need to specify are:
bootstrap.servers
: A list of brokers to initially connect to. List 2 to 3 brokers; you don't needed to list the full cluster.group.id
: Every consumer belongs to a group. That way they’ll share the partitions of a topic.key.deserializer/value.deserializer
: Specify how to convert the Java representation to a sequence of bytes to send data through the Kafka protocol.
BootstrapServers
: A list of brokers to initially connect to. List 2 to 3 brokers; you don't need to list the full cluster.GroupId
: Every consumer belongs to a group. That way they’ll share the partitions of a topic.key.deserializer/value.deserializer
: Specify how to convert the Java representation to a sequence of bytes to send data through the Kafka protocol.- Key and value deserializer: In C# the key and value deserializers are not configuration
parameters, instead they are passed to the builder object. For
example:
var consumer = new ConsumerBuilder<string, GenericRecord>(config) .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync()) .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync()) .Build()
In addition to the configuration properties presented above, there are a number of other important configurations that any user of Kafka must know about. These are:
heartbeat.interval.ms
: The interval of the heartbeats. For example, if the heartbeat interval is set to 3 seconds, the consumer sends a short heartbeat message to the broker every 3 seconds to indicate that it is alive.session.timeout.ms
: The consumer tells this timeout to the coordinator. This is used to control the heartbeats and remove the dead consumers. If it’s set to 10 seconds, the consumer can miss sending 2 heartbeats, assuming the previous heartbeat setting. If we increase the timeout, the consumer has more room for delays but the broker notices lagging consumers later.max.poll.interval.ms
: It is a very important detail: the consumers must maintain polling and should never do long-running processing. If a consumer is taking too much time between two polls, it will be detached from the consumer group. We can tune this configuration according to our needs. Note that if a consumer is stuck in processing, it will be noticed later if the value is increased.request.timeout.ms
: Generally every request has a timeout. This is an upper bound that the client waits for the server’s response. If this timeout elapses, then retries might happen if the number of retries are not exhausted.
HeartbeatIntervalMs
: The interval of the heartbeats. For example, if the heartbeat interval is set to 3 seconds, the consumer sends a short heartbeat message to the broker every 3 seconds to indicate that it is alive.SessionTimeoutMs
: The consumer tells this timeout to the coordinator. This is used to control the heartbeats and remove the dead consumers. If it’s set to 10 seconds, the consumer can miss sending 2 heartbeats, assuming the previous heartbeat setting. If we increase the timeout, the consumer has more room for delays but the broker notices lagging consumers later.MaxPollIntervalMs
: It is a very important detail: the consumers must maintain polling and should never do long-running processing. If a consumer is taking too much time between two polls, it will be detached from the consumer group. We can tune this configuration according to our needs. Note that if a consumer is stuck in processing, it will be noticed later if the value is increased.