Protocol between consumer and broker
Get to know how the protocol works, what messages are going on the wire, and how that contributes to the overall behavior of the consumer.
When discussing the internals of the consumers, there are a couple of basic terms to know:
- Heartbeat
- When the consumer is alive and is part of the consumer group, it sends heartbeats. These are short periodic messages that tell the brokers that the consumer is alive and everything is fine.
- Session
- Often one missing heartbeat is not a big deal, but how do you know if a consumer is not sending heartbeats for long enough to indicate a problem? A session is such a time interval. If the consumer didn’t send any heartbeats for longer than the session, the broker can consider the consumer dead and remove it from the group.
- Coordinator
- The special broker which manages the group on the broker side is called the coordinator. The coordinator handles heartbeats and assigns the leader. Every group has a coordinator that organizes the startup of a consumer group and assist whenever a consumer leaves the group.
- Leader
- The leader consumer is elected by the coordinator. Its job is to assign partitions to every consumer in the group at startup or whenever a consumer leaves or joins the group. The leader holds the assignment strategy, it is decoupled from the broker. That means consumers can reconfigure the partition assignment strategy without restarting the brokers.
Startup Protocol
As mentioned before, the consumers are working usually in groups. So a major part of the startup process is spent with figuring out the consumer group.
At startup, the first step is to match protocol versions. It is possible that the broker
and the consumer are of different versions (the broker is older and the consumer is newer,
or vice versa). This matching is done by the API_VERSIONS
request.
The next step is to collect cluster information, such as the addresses of all the brokers
(prior to this point we used the bootstrap server as a reference), partition counts, and
partition leaders. This is done in the METADATA
request.
After acquiring the metadata, the consumer has the information needed to join the group. By
this time on the broker side, a coordinator has been selected per consumer group. The
consumers must find their coordinator with the FIND_COORDINATOR
request.
After finding the coordinator, the consumer(s) are ready to join the group. Every consumer
in the group sends their own member-specific metadata to the coordinator in the
JOIN_GROUP
request. The coordinator waits until all the consumers have
sent their request, then assigns a leader for the group. At the response plus the collected
metadata are sent to the leader, so it knows about its group.
The remaining step is to assign partitions to consumers and propagate this state. Similar
to the previous request, all consumers send a SYNC_GROUP
request to the
coordinator; the leader provides the assignments in this request. After it receives the sync
request from each group member, the coordinator propagates this member state in the
response. By the end of this step, the consumers are ready and can start consuming.
Consumption Protocol
When consuming, the first step is to query where should the consumer start. This is done in
the OFFSET_FETCH
request. This is not mandatory: the consumer can also
provide the offset manually. After this, the consumer is free to pull data from the broker.
Data consumption happens in the FETCH
requests. These are the long-pull
requests. They are answered only when the broker has enough data; the request can be
outstanding for a longer period of time.
From time to time, the application has to either manually or automatically save the offsets
in an OFFSET_COMMIT
request and send heartbeats too in the
HEARTBEAT
requests. The first ensures that the position is saved while
the latter ensures that the coordinator knows that the consumer is alive.
Shutdown Protocol
The last step when the consumption is done is to shut down the consumer gracefully. This is
done in one single step, called the LEAVE_GROUP
protocol.