Kafka KRaft [Technical Preview]
Apache Kafka Raft (KRaft) is a consensus protocol used for metadata management that was developed as a replacement for Apache ZooKeeper. Using KRaft for managing Kafka metadata instead of ZooKeeper offers various benefits including a simplified architecture and a reduced operational footprint. Learn more about what KRaft is, how it works, and how it compares to ZooKeeper deployments.
In traditional Kafka clusters, Kafka’s metadata (including broker metadata, configurations, and client quotas) is stored in ZooKeeper. The brokers access this data in ZooKeeper through the broker that is designated as the active controller. This is the traditional way of how Kafka operates.
In the controller quorum mode, ZooKeeper is replaced with Kafka’s own consensus implementation called Kafka Raft, or KRaft for short. KRaft is based on the Raft algorithm with some changes that are otherwise native in Kafka.
In this mode, a set of specialized brokers, called controllers, are deployed. These controllers form a cluster and are responsible for storing and maintaining Kafka metadata. The quorum of controllers ensures that the metadata is available at all times and that it is consistent. Additionally, one of the controllers is designated as the active controller (also known as the leader), which coordinates metadata changes with the brokers. The controllers store the metadata in a Kafka topic that can be found on each node.
- Fast and efficient metadata storage and
With ZooKeeper, cluster updates are committed to ZooKeeper through the active controller. In KRaft, updates are sent directly to the controller quorum, eliminating one hop in the chain. In addition, because metadata is stored in a Kafka topic, both brokers and controllers can easily replicate and store the data locally. Both of these aspects of KRaft can help in substantially improving Kafka’s performance.
- Simplified, deployment, setup, and management
KRaft eliminates the need for a ZooKeeper service. This means that you only need to manage and maintain a single service, which is Kafka.
The Raft algorithm
Learn the basics of the Raft consensus algorithm, which KRaft is based on.
In many distributed systems there is a need for consensus. For example, consensus is needed to decide what is a distributed configuration’s value, which servers are active, or which ones are passive. This type of information is critical for a distributed actor. Therefore, the information must be stored in a way that it is accessible reliably at all times.
There are many iterations to a safe consensus algorithm that ensure that all of the requirements above are met. A more modern version of such an algorithm is the Raft algorithm.
Raft uses timers and periodic heartbeats to keep track of alive nodes. As long as the majority of nodes are functioning, clients can publish and access the data. Raft replicates data on all of the Raft quorum nodes. Additionally a leader is elected randomly on first startup. When data is produced, it is pushed to peer nodes with the periodic heartbeats. Follower nodes acknowledge this data. A request is committed only when the majority of nodes confirm it.
Raft can ensure that partitioned controller quorums operate correctly. That is, if a partition does not have the majority, then data sent to those nodes is not committed. When partitions merge, the majority wins.
In case of a leader failure, remaining nodes hold a vote. A new leader is chosen from the remaining pool of nodes. After the new leader is elected, it becomes the origin of the data. Other nodes switch over and replicate from the new leader.
Voting is timeout based. If a node doesn’t get a heartbeat within the session timeout, it sends out vote requests. Other nodes do the same, but at the same time accept incoming votes. Eventually, the first node who sent out requests wins and is elected the new leader.
For more information on Raft, see https://raft.github.io/.
KRaft metadata management
Learn how Kafka metadata is stored and managed in KRaft.
In KRaft mode, metadata is stored in a dedicated internal Kafka topic called
__cluster_metadata. This topic has a single partition that contains all
information related to the current state of the Kafka service. The leader of the single partition
is the active controller. The active controller is responsible for writing metadata changes to
Both brokers and controllers fetch and replicate this topic to store it locally.
Controllers are replica followers, meaning that they are part of the in-sync replica set (ISR)
for the topic. Brokers are observers only. This means that while they replicate the topic, they
are not part of the ISR. Both brokers and controllers periodically commit metadata from
__cluster_metadata to their metadata cache.
When a broker starts up, it reads the most recent changes available in its metadata cache. Afterward, it requests the changes from the controller that happened since the last update.
Next, the active controller responds with a series of deltas or the full state of the cluster. The full state of the cluster is only sent if the broker has no cached metadata (it's an empty broker starting for the first time) or if it is severely lagging behind. The broker then commits the updates it receives to its cache and the topic replica.
Going forward, the broker periodically asks for updates from the active controller. These periodic fetch requests that the broker sends also act as heartbeats.
KRaft metadata replication and the HWM
Learn about the high watermark and how Kafka metadata is replicated in KRaft.
In KRaft mode, Kafka uses its own implementation of the Raft protocol. This implementation is based on Raft, but is heavily influenced by Kafka’s log replication protocol.
One of the cornerstones of Kafka replication is the high watermark (HWM). The HWM defines the highest offset that has been replicated across all ISR replicas. It is maintained by the leader, but followers cache it and update it with each offset fetch so a consistent state is maintained across the cluster.
On leader change, all partitions truncate their log to the HWM (messages above are considered unsuccessful and the client retries). In general, the HWM must be larger than the log start offset and smaller than the log end offset.
In KRaft, the concept of the HWM is slightly different. Instead of being the highest offset that is replicated across all ISR replicas, it is the highest offset that is replicated by the majority of replicas. This way Kafka’s replication closely mimics the Raft algorithm’s behavior.
KRaft leader changes
Learn how leader changes are handled in KRaft.
When the active controller (the leader for the controllers and the
__cluster_metadata topic) fails, a new leader must be elected. Otherwise, Kafka
can not continue to function. Leader changes in KRaft work similarly to the original
On leader change, as according to the protocol, each node gets to vote. In KRaft each node can cast a single vote per epoch. Nodes always vote for the node that has the “longest” log. Longest in this case means highest epoch, or, if the epoch is identical, larger offset. The new leader is decided by a simple majority. That is, the node with the most votes is elected leader.
For example, assume that there are three controllers. Each of them has a replica of
__cluster_metadata. Replica A was the leader, but due to a node failure it goes
offline and falls out of the in-sync replica set (ISR).
A new leader is required. Each candidate sends a vote request which contains the epoch and the offset. If the sender has a longer log than the recipient of the request, the recipient votes for the sender.
In this example the candidate (replica C) has an epoch count of 2, while the follower (replica B) only has an epoch count of 1. The candidate has the “longest” log. The follower, therefore, votes for the candidate and the candidate becomes the leader. If leadership is decided, the followers truncate back to the HWM and fetch the new leader’s data.
KRaft log reconciliation
Learn how logs are reconciled and how the high watermark (HWM) is incremented in KRaft.
When a leader change happens, the log must be reconciled. After a new leader is elected, followers send fetch requests to the leader. The fetch request contains the last fetched epoch and an offset. The epoch is initially 1, but increments with each leader change. Data is in fact stored with the epoch numbers. Therefore, the high watermark (HWM) is incremented only if the majority of nodes replicated the data with the highest epoch.
For example, assume that you have three controllers. Each of them has a replica of
__cluster_metadata. Replica A, which is the leader, is in epoch 1, offset 3.
Replica B, a follower, is caught up with the leader. The majority of the nodes replicated the
data with the highest epoch. As a result, the HWM is moved to offset 3.
Let’s look at a more complex example. Assume that you have the same setup, but more messages were committed and a number of leader changes happened. The cluster is in the following state.
Notice how both followers (B and C) already accepted the leader’s epoch, and that replica A and B have additional messages above the HWM. However, not all messages were replicated to the majority of nodes and the HWM is not incremented. In this scenario, the log must be reconciled before the HWM can be incremented. The reconciliation process is as follows.
First, followers send a fetch request to the leader. The fetch request contains the offset as well as the last fetched epoch. For replica B, the offset is 7, the last fetched epoch is 2.
Based on the fetch request, the leader knows that the logs have diverged and sends a response that includes the diverging epoch and the end offset of that epoch. In this case, the diverging epoch is 2, the end offset is 6.
The information included in the fetch responses enables the follower to truncate its log to the correct position.
After truncating the log, the follower can continue to replicate any new messages from the leader. Once all new messages are replicated to the majority of nodes the HWM is incremented.
The same process also happens for replica C. Ultimately, it catches up to the leader as well.
KRaft broker state machines
Brokers in KRaft mode have four states. Learn what these states are and when brokers enter each state.
The states are as follows:
- Offline: The broker hasn't started. The broker remains in this state until the local logs from the file system are loaded.
- Fenced: The broker has started, but can’t receive requests from clients. Replicas aren’t in sync with the cluster. The broker stays in this state until it catches up with the current metadata of the cluster.
- Running: The broker has started and is ready to receive requests from clients.
- Stopping: The broker is still running, but it is migrating the partition leaders to other brokers.