Kafka stretch clusters
Stretch clusters are highly resilient and cost-effective Kafka deployments. Learn the requirements, possible deployment architectures, and configuration needed to deploy Kafka stretch clusters.
A stretch cluster is a single logical Kafka cluster deployed across multiple Data Centers (DC) or other independent physical infrastructures such as cloud availability zones. By utilizing a single Kafka cluster over multiple DCs, you can achieve the following:
- Strong data durability guarantees as a result of synchronous replication between DCs.
- Automatic failover of clients when a DC goes down.
Stretch clusters have specific requirements for latency between DCs and require specific configurations for achieving strong guarantees on data durability. Additionally, in some use cases, complimentary features can be used to minimize cross-DC traffic. It is important to note that the extra latency introduced by this setup can have a significant impact on the throughput of Kafka.
Kafka stretch cluster requirements
Learn about the requirements of a stretch cluster deployment.
Number of Data Centers and the ZooKeeper quorum
A stretch cluster requires at least three DCs to function properly. While Kafka can support a setup where brokers are hosted in two DCs, Kafka is dependent on ZooKeeper for metadata storage. Theoretically, it is possible to use more than three DCs in the stretch cluster architecture. However, in the majority of cases, the cost of the additional cross-DC data traffic will outweigh the benefits of having the data copied to more than three DCs.
Since ZooKeeper is a quorum-based system, a majority is required to keep the metadata storage and Kafka running. To support service continuity in case of a DC failure, sufficient ZooKeeper nodes must remain so that a majority vote can happen. For example, in a two DC setup, one of the two DCs must host more ZooKeeper nodes than the other. If the DC that hosts the higher number of ZooKeeper nodes goes down, ZooKeeper majority is lost. This results in the Kafka cluster also going down.
Latency between Data Centers
In order to achieve hight throughput, Kafka is designed from the ground up with the assumption that latency is low between the cluster nodes. Because of this, a stretch cluster deployment requires low latency between the DCs. There are two major issues with high latency in a stretch cluster: Kafka replication throughput and metadata operation latency.
While the replication stays functionally correct even with high latency, the throughput of Kafka suffers greatly from increased latency.
ZooKeeper is also sensitive to latency. ZooKeeper changes are synchronized in the cluster, making metadata changes run slow. These changes involve controller election, topic metadata related operations (create/delete/update config), and ISR changes. In an ideal deployment with low latency, these are infrequent operations and do not affect the throughput of data production and consumption. However, the higher the latency, the more frequent these operation become. With increased latency, replicas lag behind the ISR more easily, which generates more ISR change operations. There is also a phenomenon called ISR thrashing. This occurs when a replica frequently joins but then lags behind the ISR, which is also tied to high latency scenarios.
Because of these issues, Cloudera recommends using infrastructure where the maximum latency between the DCs is 50 ms. In general, the latency should be minimized. Any increase in latency greatly affects the throughput of a Kafka stretch cluster. While it is possible for a Kafka stretch cluster to function correctly in some use cases with higher latency (for example, light duty clusters), Cloudera does not recommend using the stretch cluster architecture if you have high latency.
Kafka stretch cluster architectures
A Kafka stretch cluster can be operated in a 3 or 2.5 DC configuration.
A stretch cluster requires at least three DCs to function correctly. Based on the use case, it is possible to span the Kafka cluster over two DCs, and only use the third cluster as a tie-breaker ZooKeeper node. This architecture is called the 2.5 DC setup. Typical use cases would either use the 2.5 DC setup, or a 3 DC setup. It is possible to use multiple DCs (given that the latency requirements are met), but in general, cross-DC traffic should be minimized.
2.5 DC stretch cluster
Pros:
- Cost efficient (both in terms of nodes used and cross-DC data traffic).
- Can ensure synchronized writes into two DCs (data durability, RPO=0).
- Can tolerate single DC failure, reads supported.
- Less sensitive to the tie-breaker DC failure.
Con:
- Writes for durable topics are not supported after a DC that hosts brokers goes down.
3 DC stretch cluster
Pros:
- Can ensure synchronized writes into at least two DCs (data durability, RPO=0).
- Depending on the configuration, can guarantee writes over three DCs, but write availability is reduced.
- Can tolerate a single DC going down. This is true for both reads and writes.
- Write availability depends on the configuration (whether two or three DC guarantee was configured).
Con:
- As a result of more nodes in all DCs and more cross-DC data traffic, this architecture is more expensive than a 2.5 DC setup.
Cluster configuration for Kafka stretch clusters
Kafka stretch clusters require the brokers, topics, and clients of the deployment to be configured. Configuration is needed to achieve the desired guarantees on data durability.
To achieve the high data durability in a stretch cluster, configuring the brokers, topics and clients in the deployment is required. Specifically you have to:
- Configure brokers and topics so that topic partition replicas are distributed evenly among the DCs.
- Configure producers to write messages with the highest durability guarantee
(
acks=all
). - Configure topics to have the required minimum in-sync replicas when accepting writes.
Even distribution of topic replicas among DCs
To ensure that replicas are distributed evenly among the DCs, stretch clusters use Kafka's
rack awareness feature (KIP-36). Kafka brokers have a
broker.rack
configuration. This property specifies the physical location
of the broker in the form of a rack ID. The rack ID is an arbitrary, user defined string
that can represent any type of physical infrastructure. In stretch clusters, the property
and the ID can be used to let the broker know which DC it is running in. For example, the ID
can be set to DC1
for the brokers inside DC1, DC2
for DC2,
and so on.
If a topic is created with a replication factor of two or more in a Kafka instance that has its rack IDs specified, Kafka assigns the replicas in a way that not only ensures that the replicas of the same partition are located in different brokers, but will also try to distribute them evenly among the DCs (using round-robin).
Topic_A, RF=4 partition 0: [1, 4, 2, 5]
Topic_B, RF=5 partition 0: [1, 4, 2, 5, 3]
For
Topic_A there are two replicas in DC1 and two replicas in DC2. Topic_A is evenly
distributed. This happens if [***RF***] %[***DC
COUNT***] == 0.For Topic_B there are three replicas in DC1 and two replicas in DC2. Since [***RF***] % [***DC COUNT***] != 0, distribution is not perfectly even, but round-robin ensures that the maximum difference between replica counts per rack is one.
To utilize this behavior to your advantage, durable topics should be configured to have a replication factor which is a multiple of the number of DCs. For example, if you have three DCs, set the replication factor to 3, 6, 9, and so on.
In the majority of cases, having a single replica inside a DC is sufficient. That is, setting the replication factor higher than the number of DCs is usually not necessary. Doing so increases cross-DC traffic and storage capacity requirements. However, having more replicas does have benefits, which are as follows:
- Increased availability. More broker failures can be tolerated without the topic being unavailable.
- Follower fetching is supported even in case of single broker failures.
- Increased data durability in case of disk failures.
Producer acknowledgments (acks)
Producers have a configuration property called acks
. The
acks
property affects when a leader should send an acknowledgement to the
producer after a write operation. The property can be set to three different values, each
corresponding to a level of acknowledgement. Each level represents a tradeoff between
throughput and write durability. The configuration values are as follows:
- 0
- No acknowledgement. The producer handles the write as finished right after it is sent to the leader. This setting provides the highest throughput, but also has the highest risk of losing writes. Writes might not even make it to the leader log (and disk) before the leader goes down.
- 1
- Leader acknowledgement. The leader sends an acknowledgement right after the message
was appended to its log. This setting provides lower throughput compared to
acks=0
, but ensures that at least a single log has the message appended. - all
- ISR acknowledgement. The leader appends the message to its log, waits for the in-sync replicas (ISR) to replicate the message, and then sends the acknowledgment. This setting provides the lowest throughput. This is because the leader might need to wait for multiple followers to fetch and write the message to their log before the acknowledgment can be sent out. However, it has the highest durability as it ensures that multiple brokers have appended the message to their log. With this setting, the decision on when a write should be considered done is handed over from the producer to the partition leader.
For high durability, producers must be configured to use acks=all
. To
understand and fully utilize the ISR concept in the stretch cluster, you need to also
correctly configure the topics.
ISR and minimum in-sync replicas of a topic
For durable writes, besides the replication factor, it is also important to correctly
configure the minimum in-sync replica count (min.insync.replicas
) of a
topic.
The ISR is tracked by Kafka based on broker configuration. The leader always keeps track which of its followers are in sync (caught up), and which are lagging behind. The replicas that are caught up are part of the ISR.
When a producer sends a request with acks=all
, the leader will wait for
all members of the ISR to replicate the message before
sending an acknowledgment. If there are nine replicas (one leader and eight followers), and
all are part of the ISR, then the acknowledgment will be sent after all eight followers have
replicated the message. By default, if only the leader is part of the ISR, then the
acknowledgment is sent right after the leader appends the message to its log.
To change this behavior, topics have a min.insync.replicas
property. This
property represents a minimum size requirement for the ISR when a producer writes into a
topic with acks=all
. This means that if
min.insync.replicas=2
, then the leader and at least a single follower
must be part of the ISR and that the ISR must replicate the message before the
acknowledgement is sent.
In a stretch cluster setup, if you want to ensure that writes are replicated to multiple
DCs before acknowledgment happens, the min.insync.replicas
property must be
correctly configured. The following example demonstrates how you can calculate the correct
min.insync.replicas
value for your deployment.
[***DC COUNT***]= 3
[***MINIMUM DC REPLICAS***]= 2
[***RF***]= 6
First,
you need to calculate how many replicas reside in each DC, [***REPLICA PER
DC***]. This can be done by dividing the replication factor by the number of
DCs:[***RF***]/[***DC COUNT***]=[***REPLICA PER DC***]
6 / 3 = 2
Afterwards,
you can calculate min.insync.replicas
using the following
formula:[***REPLICA PER DC***] * ([***MINIMUM DC REPLICAS***] - 1) + 1 = min.insync.replicas
2 * (2 - 1) + 1 = 3
This formula ensures that whichever replicas are in sync for the topic, there will always
be at least a [***MINIMUM DC REPLICAS***] number of DCs hosting the
active replicas. However, whenever you have fewer replicas in the ISR, writes will start to
fail because the min.insync.replica
requirement is not met.
With min.insync.replicas=3
you can ensure that even in the worst case
scenario (most of the replicas in the ISR are located in the same DC), at least one replica
will be located in a different DC.
min.insync.replicas
would change to
five. 2 * (3 - 1) + 1 = 5
With min.insync.replicas=5
you can ensure that even in a worst case
scenario of ISR members, all three DCs are replicating the write before it is acknowledged.
However, at the same time, this means that any DC going down reduces the ISR size to four,
which will cause the cluster to fail durable produce requests.
Partition leadership
Everything described so far about the ISR and durable writes depends on the fact that partition leadership changes depend on the ISR. When the leader is not available, Kafka transfers the leadership to one of the ISR members. This ensures that all writes acknowledged by the cluster will be present on the next leader. Because of this, all durable topics must have unclean leader election disabled. Otherwise, accepted writes might get lost in an unclean leader election.
Conclusion: Durable writes (RPO=0)
With the three essential configurations done (broker.rack
,
acks=all
, min.insync.replicas
), you ensure that:
- Replicas are evenly distributed among DCs.
- Producers write with the highest durability.
- The level of durability configured with
min.insync.replicas
ensures that writes are synchronized to the required number of DCs. - Topics only allow clean leader elections (based on the ISR).