Kafka stretch clusters

Stretch clusters are highly resilient and cost-effective Kafka deployments. Learn the requirements, possible deployment architectures, and configuration needed to deploy Kafka stretch clusters.

A stretch cluster is a single logical Kafka cluster deployed across multiple Data Centers (DC) or other independent physical infrastructures such as cloud availability zones. By utilizing a single Kafka cluster over multiple DCs, you can achieve the following:

  • Strong data durability guarantees as a result of synchronous replication between DCs.
  • Automatic failover of clients when a DC goes down.

Stretch clusters have specific requirements for latency between DCs and require specific configurations for achieving strong guarantees on data durability. Additionally, in some use cases, complimentary features can be used to minimize cross-DC traffic. It is important to note that the extra latency introduced by this setup can have a significant impact on the throughput of Kafka.

Kafka stretch cluster requirements

Learn about the requirements of a stretch cluster deployment.

Number of Data Centers and the ZooKeeper quorum

A stretch cluster requires at least three DCs to function properly. While Kafka can support a setup where brokers are hosted in two DCs, Kafka is dependent on ZooKeeper for metadata storage. Theoretically, it is possible to use more than three DCs in the stretch cluster architecture. However, in the majority of cases, the cost of the additional cross-DC data traffic will outweigh the benefits of having the data copied to more than three DCs.

Since ZooKeeper is a quorum-based system, a majority is required to keep the metadata storage and Kafka running. To support service continuity in case of a DC failure, sufficient ZooKeeper nodes must remain so that a majority vote can happen. For example, in a two DC setup, one of the two DCs must host more ZooKeeper nodes than the other. If the DC that hosts the higher number of ZooKeeper nodes goes down, ZooKeeper majority is lost. This results in the Kafka cluster also going down.

Latency between Data Centers

In order to achieve hight throughput, Kafka is designed from the ground up with the assumption that latency is low between the cluster nodes. Because of this, a stretch cluster deployment requires low latency between the DCs. There are two major issues with high latency in a stretch cluster: Kafka replication throughput and metadata operation latency.

While the replication stays functionally correct even with high latency, the throughput of Kafka suffers greatly from increased latency.

ZooKeeper is also sensitive to latency. ZooKeeper changes are synchronized in the cluster, making metadata changes run slow. These changes involve controller election, topic metadata related operations (create/delete/update config), and ISR changes. In an ideal deployment with low latency, these are infrequent operations and do not affect the throughput of data production and consumption. However, the higher the latency, the more frequent these operation become. With increased latency, replicas lag behind the ISR more easily, which generates more ISR change operations. There is also a phenomenon called ISR thrashing. This occurs when a replica frequently joins but then lags behind the ISR, which is also tied to high latency scenarios.

Because of these issues, Cloudera recommends using infrastructure where the maximum latency between the DCs is 50 ms. In general, the latency should be minimized. Any increase in latency greatly affects the throughput of a Kafka stretch cluster. While it is possible for a Kafka stretch cluster to function correctly in some use cases with higher latency (for example, light duty clusters), Cloudera does not recommend using the stretch cluster architecture if you have high latency.

Kafka stretch cluster architectures

A Kafka stretch cluster can be operated in a 3 or 2.5 DC configuration.

A stretch cluster requires at least three DCs to function correctly. Based on the use case, it is possible to span the Kafka cluster over two DCs, and only use the third cluster as a tie-breaker ZooKeeper node. This architecture is called the 2.5 DC setup. Typical use cases would either use the 2.5 DC setup, or a 3 DC setup. It is possible to use multiple DCs (given that the latency requirements are met), but in general, cross-DC traffic should be minimized.

2.5 DC stretch cluster

Figure 1. 2.5 DC Kafka stretch cluster

Pros:

  • Cost efficient (both in terms of nodes used and cross-DC data traffic).
  • Can ensure synchronized writes into two DCs (data durability, RPO=0).
  • Can tolerate single DC failure, reads supported.
  • Less sensitive to the tie-breaker DC failure.

Con:

  • Writes for durable topics are not supported after a DC that hosts brokers goes down.

3 DC stretch cluster

Figure 2. 3 DC stretch cluster

Pros:

  • Can ensure synchronized writes into at least two DCs (data durability, RPO=0).
    • Depending on the configuration, can guarantee writes over three DCs, but write availability is reduced.
  • Can tolerate a single DC going down. This is true for both reads and writes.
    • Write availability depends on the configuration (whether two or three DC guarantee was configured).

Con:

  • As a result of more nodes in all DCs and more cross-DC data traffic, this architecture is more expensive than a 2.5 DC setup.

Cluster configuration for Kafka stretch clusters

Kafka stretch clusters require the brokers, topics, and clients of the deployment to be configured. Configuration is needed to achieve the desired guarantees on data durability.

To achieve the high data durability in a stretch cluster, configuring the brokers, topics and clients in the deployment is required. Specifically you have to:

  • Configure brokers and topics so that topic partition replicas are distributed evenly among the DCs.
  • Configure producers to write messages with the highest durability guarantee (acks=all).
  • Configure topics to have the required minimum in-sync replicas when accepting writes.

Even distribution of topic replicas among DCs

To ensure that replicas are distributed evenly among the DCs, stretch clusters use Kafka's rack awareness feature (KIP-36). Kafka brokers have a broker.rack configuration. This property specifies the physical location of the broker in the form of a rack ID. The rack ID is an arbitrary, user defined string that can represent any type of physical infrastructure. In stretch clusters, the property and the ID can be used to let the broker know which DC it is running in. For example, the ID can be set to DC1 for the brokers inside DC1, DC2 for DC2, and so on.

If a topic is created with a replication factor of two or more in a Kafka instance that has its rack IDs specified, Kafka assigns the replicas in a way that not only ensures that the replicas of the same partition are located in different brokers, but will also try to distribute them evenly among the DCs (using round-robin).

For example, assume you have two DCs ("racks") with six brokers and two topics. The replication factor of the topics is not uniform. Topic_A has a replication factor of 4, Topic_B has a replication factor of 5. In this scenario, Kafka will create the following replica assignments:
Topic_A, RF=4 partition 0: [1, 4, 2, 5]
Topic_B, RF=5 partition 0: [1, 4, 2, 5, 3] 
For Topic_A there are two replicas in DC1 and two replicas in DC2. Topic_A is evenly distributed. This happens if [***RF***] %[***DC COUNT***] == 0.

For Topic_B there are three replicas in DC1 and two replicas in DC2. Since [***RF***] % [***DC COUNT***] != 0, distribution is not perfectly even, but round-robin ensures that the maximum difference between replica counts per rack is one.

Figure 3. Kafka stretch cluster replica assignment example

To utilize this behavior to your advantage, durable topics should be configured to have a replication factor which is a multiple of the number of DCs. For example, if you have three DCs, set the replication factor to 3, 6, 9, and so on.

In the majority of cases, having a single replica inside a DC is sufficient. That is, setting the replication factor higher than the number of DCs is usually not necessary. Doing so increases cross-DC traffic and storage capacity requirements. However, having more replicas does have benefits, which are as follows:

  1. Increased availability. More broker failures can be tolerated without the topic being unavailable.
  2. Follower fetching is supported even in case of single broker failures.
  3. Increased data durability in case of disk failures.

Producer acknowledgments (acks)

Producers have a configuration property called acks. The acks property affects when a leader should send an acknowledgement to the producer after a write operation. The property can be set to three different values, each corresponding to a level of acknowledgement. Each level represents a tradeoff between throughput and write durability. The configuration values are as follows:

0
No acknowledgement. The producer handles the write as finished right after it is sent to the leader. This setting provides the highest throughput, but also has the highest risk of losing writes. Writes might not even make it to the leader log (and disk) before the leader goes down.
1
Leader acknowledgement. The leader sends an acknowledgement right after the message was appended to its log. This setting provides lower throughput compared to acks=0, but ensures that at least a single log has the message appended.
all
ISR acknowledgement. The leader appends the message to its log, waits for the in-sync replicas (ISR) to replicate the message, and then sends the acknowledgment. This setting provides the lowest throughput. This is because the leader might need to wait for multiple followers to fetch and write the message to their log before the acknowledgment can be sent out. However, it has the highest durability as it ensures that multiple brokers have appended the message to their log. With this setting, the decision on when a write should be considered done is handed over from the producer to the partition leader.

For high durability, producers must be configured to use acks=all. To understand and fully utilize the ISR concept in the stretch cluster, you need to also correctly configure the topics.

ISR and minimum in-sync replicas of a topic

For durable writes, besides the replication factor, it is also important to correctly configure the minimum in-sync replica count (min.insync.replicas) of a topic.

The ISR is tracked by Kafka based on broker configuration. The leader always keeps track which of its followers are in sync (caught up), and which are lagging behind. The replicas that are caught up are part of the ISR.

When a producer sends a request with acks=all, the leader will wait for all members of the ISR to replicate the message before sending an acknowledgment. If there are nine replicas (one leader and eight followers), and all are part of the ISR, then the acknowledgment will be sent after all eight followers have replicated the message. By default, if only the leader is part of the ISR, then the acknowledgment is sent right after the leader appends the message to its log.

To change this behavior, topics have a min.insync.replicas property. This property represents a minimum size requirement for the ISR when a producer writes into a topic with acks=all. This means that if min.insync.replicas=2, then the leader and at least a single follower must be part of the ISR and that the ISR must replicate the message before the acknowledgement is sent.

In a stretch cluster setup, if you want to ensure that writes are replicated to multiple DCs before acknowledgment happens, the min.insync.replicas property must be correctly configured. The following example demonstrates how you can calculate the correct min.insync.replicas value for your deployment.

Assume that you have three DCs. You want to replicate writes into at least two DCs before a write is acknowledged. Replication factor is six. That is:
[***DC COUNT***]= 3
[***MINIMUM DC REPLICAS***]= 2
[***RF***]= 6
First, you need to calculate how many replicas reside in each DC, [***REPLICA PER DC***]. This can be done by dividing the replication factor by the number of DCs:
[***RF***]/[***DC COUNT***]=[***REPLICA PER DC***]
6 / 3 = 2
Afterwards, you can calculate min.insync.replicas using the following formula:
[***REPLICA PER DC***] * ([***MINIMUM DC REPLICAS***] - 1) + 1 = min.insync.replicas
2 * (2 - 1) + 1 = 3

This formula ensures that whichever replicas are in sync for the topic, there will always be at least a [***MINIMUM DC REPLICAS***] number of DCs hosting the active replicas. However, whenever you have fewer replicas in the ISR, writes will start to fail because the min.insync.replica requirement is not met.

With min.insync.replicas=3 you can ensure that even in the worst case scenario (most of the replicas in the ISR are located in the same DC), at least one replica will be located in a different DC.

To look at another example, assume you have the same setup, but change [***MINIMUM DC REPLICAS***] to three, min.insync.replicas would change to five.
2 * (3 - 1) + 1 = 5

With min.insync.replicas=5 you can ensure that even in a worst case scenario of ISR members, all three DCs are replicating the write before it is acknowledged. However, at the same time, this means that any DC going down reduces the ISR size to four, which will cause the cluster to fail durable produce requests.

Partition leadership

Everything described so far about the ISR and durable writes depends on the fact that partition leadership changes depend on the ISR. When the leader is not available, Kafka transfers the leadership to one of the ISR members. This ensures that all writes acknowledged by the cluster will be present on the next leader. Because of this, all durable topics must have unclean leader election disabled. Otherwise, accepted writes might get lost in an unclean leader election.

Conclusion: Durable writes (RPO=0)

With the three essential configurations done (broker.rack, acks=all, min.insync.replicas), you ensure that:

  • Replicas are evenly distributed among DCs.
  • Producers write with the highest durability.
  • The level of durability configured with min.insync.replicas ensures that writes are synchronized to the required number of DCs.
  • Topics only allow clean leader elections (based on the ISR).