Rack awareness

Racks provide information about the physical location of a broker or a client. A Kafka cluster can be made rack aware by configuring rack awareness for the Kafka brokers, consumers, and ZooKeeper servers. Enabling rack awareness can help in hardening your cluster, it provides durability guarantees, and significantly decreases the chances of data loss.

To enable rack awareness for a Kafka cluster running in Kubernetes with CSM Operator you complete the following tasks.
  1. Configure rack information for your Kubernetes nodes using labels.
  2. Configure rack awareness for both Kafka and ZooKeeper clusters.
  3. Configure follower fetching for both Kafka brokers and consumers.

Configuring rack information on Kubernetes nodes

Before you can enable rack awareness for Kafka or ZooKeeper, you must ensure that a label is configured in your Kubernetes cluster that holds rack information. You configure labels with kubectl label.

Kubernetes nodes can hold their respective rack information in labels. You can set any labels to store your rack information, however, Cloudera recommends using the topology.kubernetes.io/zone label. This is because it is a well-known Kubernetes label and cloud providers typically set this label for you automatically. If your (cloud) environment provider does not automatically set this label in your environment, you have to set it manually. This is done with kubectl label.

  1. Set your chosen label with kubectl label.
    kubectl label node [***NODE NAME***] topology.kubernetes.io/zone=[***ZONE/RACK***]
    Repeat this step for each of your nodes. For example, assuming you have six nodes, three different racks, and two nodes per rack, you would run commands similar to the following.
    kubectl label node kubernetes-m02 topology.kubernetes.io/zone=eu-zone-1
    kubectl label node kubernetes-m03 topology.kubernetes.io/zone=eu-zone-1
    kubectl label node kubernetes-m04 topology.kubernetes.io/zone=eu-zone-2
    kubectl label node kubernetes-m05 topology.kubernetes.io/zone=eu-zone-2
    kubectl label node kubernetes-m06 topology.kubernetes.io/zone=eu-zone-3
    kubectl label node kubernetes-m07 topology.kubernetes.io/zone=eu-zone-3
  2. Verify your configuration.
    kubectl get node -o=custom-columns=NODE:.metadata.name,ZONE:.metadata.labels."topology\.kubernetes\.io/zone" | sort -k2
    The output lists your nodes and their rack information (zone). Output will be similar to the following example.
    NODE                ZONE
    kubernetes-m01       <none>
    kubernetes-m02   eu-zone-1
    kubernetes-m03   eu-zone-1
    kubernetes-m04   eu-zone-2
    kubernetes-m05   eu-zone-2
    kubernetes-m06   eu-zone-3
    kubernetes-m07   eu-zone-3

Configuring rack awareness for ZooKeeper

ZooKeeper rack awareness is configured in the Kafka resource by specifying affinity rules.

Zookeeper rack awareness can only be configured through the Kafka resource. As a result, you can only set configuration that applies for all ZooKeeper instances.

To configure rack awareness for ZooKeeper, Cloudera recommends setting the following two affinity rules for Zookeeper in the Kafka resource.

#...
kind: Kafka
spec:
 zookeeper:
   template:
     pod:
       affinity:
         nodeAffinity:
           requiredDuringSchedulingIgnoredDuringExecution:
             nodeSelectorTerms:
               - matchExpressions:
                   - key: topology.kubernetes.io/zone
                     operator: Exists
         podAntiAffinity:
           preferredDuringSchedulingIgnoredDuringExecution:
             - podAffinityTerm:
                 labelSelector:
                   matchLabels:
                     strimzi.io/cluster: [***CLUSTER NAME***]
                     strimzi.io/name: [***CLUSTER NAME***]-zookeeper
                 topologyKey: topology.kubernetes.io/zone
               weight: 100

These rules are proper for most cases, but it is still possible that ZooKeeper pods are scheduled to another node in a different rack after a node failure. This is because these rules do not force keeping the ZooKeeper pods in a specific rack.

Currently, the only way to enforce ZooKeeper instances to stick to specific racks is to use storage overrides with your own storage classes and volume node affinities. If a pod has a persistent volume claim on a volume with node affinity set, the scheduler considers the restrictions on the volume in use when scheduling the pod. This way, you can configure a rack-aware cluster without the limitations mentioned above.

Configuring rack awareness for Kafka brokers

Rack awareness for Kafka is configured in your Kafka resource by specifying the Kubernetes node label that holds rack information. Optionally, you can configure nodeAffinity rules in the KafkaNodePool resource for stricter broker placement.

Kafka brokers are made rack-aware by configuring the broker.rack property. When broker racks are configured, Kafka intentionally places replicas of the same partition (whenever a topic is created, modified, and so on) into different racks to protect the data from rack failures.

In CSM Operator, you do not set broker.rack directly in your Kafka resource to configure rack awareness. Instead, you specify which node label to use as rack information by configuring the kafka.rack.topologyKey property in the Kafka resource.

If kafka.rack.topologyKey is set, CSM Operator automatically sets the broker.rack property of each broker based on the node label value that the broker pod is scheduled to. Additionally, the broker pods automatically get an affinity and anti-affinity rule. These rules guarantee best effort spreading of brokers between racks, but do not force having the same broker always in the same rack.

Because the default rules only guarantee best effort spreading, Cloudera recommends that you override these rules with stricter rules explicitly configuring which group of nodes should be placed in which racks.

The following steps demonstrate how to configure kafka.rack.topologyKey and demonstrate what rules you have to set in the KafkaNodePool resource if you want to ensure that a group of nodes are always placed in the same rack.

  1. Configure kafka.rack.topologyKey in your Kafka resource.
    #...
    kind: Kafka
    spec:
      kafka:
        rack:
          topologyKey: topology.kubernetes.io/zone
  2. Optional: Explicitly configure which group of nodes are placed in which rack.
    This can be done by adding a required nodeAffinity rule in your KafkaNodePool resources. This step is marked as optional but is recommended by Cloudera. The following examples demonstrate a configuration where there are two node pools. The nodes in each pool are assigned to separate racks (zones).
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: first-pool
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      template:
        pod:
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                  - matchExpressions:
                      - key: topology.kubernetes.io/zone
                        operator: In
                        values:
                          - eu-zone-1
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: second-pool
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      template:
        pod:
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                  - matchExpressions:
                      - key: topology.kubernetes.io/zone
                        operator: In
                        values:
                          - eu-zone-2
After the changes are applied, a rolling restart is initiated.
After the cluster is restarted, check the broker.rack values of each broker. You can get the broker.rack values of multiple brokers that are in the same pool with the following command.
for broker in [***CLUSTER NAME***]-[***POOL NAME***]-[***ID RANGE***]; do
  kubectl exec -namespace [***NAMESPACE***] -it \
  $broker --container kafka \
  -- /bin/bash -c "cat /tmp/strimzi.properties" \
  | grep -E "broker.id|broker.rack" && echo "\n"
done
  • [***CLUSTER NAME***] is the name of your cluster.
  • [***POOL NAME***] is the name of the node pool.
  • [***ID RANGE***] is a range of broker IDs enclosed in curly braces ({}). For example, {1..3}.
This command will output the broker IDs and the rack information set for each broker. For example:
broker.id=0
broker.rack=eu-zone-1


broker.id=1
broker.rack=eu-zone-1


broker.id=2
broker.rack=eu-zone-1

Configuring follower fetching

You enable follower fetching by configuring your Kafka resource and specifying rack information in your Kafka clients.

If rack awareness is enabled for Kafka brokers, consumers by default continue to consume messages from partition leaders. This behavior remains the same even if the consumer and the partition leader are located in different racks.

It is possible (especially in cloud environments) that a consumer application is in a different region than the partition leader, but there is a partition follower in the same region as the consumer application. In this case it is better to consume from the partition follower instead. This way you can avoid unnecessary traffic across data centers, reducing costs and application latency. This is called follower fetching.

Follower fetching is enabled by configuring the replica selector implementation in your Kafka resource to be rack-aware. Additionally, you need to configure the client.rack property of your clients.

  1. Update your Kafka resource.
    To enable follower fetching, set the replica.selector.class broker property to the RackAwareReplicaSelector.
    #...
    kind: Kafka
    spec:
      kafka:
        rack:
          topologyKey: topology.kubernetes.io/zone
        config:
          replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector
  2. Wait until the rolling restart finishes.
    Use the following command to monitor cluster state.
    kubectl get pods --namespace [***NAMESPACE***] --output wide --watch
  3. Configure your consumers.
    client.rack=[***RACK ID***]
    The [***RACK ID***] is one of the rack IDs (zones) that you configured in the topology.kubernetes.io/zone label. The client reads from a follower replica if a follower replica host broker has a broker.rack value that is identical with the value of client.rack on the client side. If there isn't one, the client fetches data from the leader.

Default affinity rules for rack awareness

Kafka broker pods automatically get the following affinity and anti-affinity rules when rack awareness is enabled.

Affinity rule

This is a required rule, the scheduler will only schedule a broker pod to a node, if the node has the configured label set.

template:
  pod:
    affinity:
      nodeAffinity:
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
            - matchExpressions:
                - key: topology.kubernetes.io/zone
                  operator: Exists

Anti-affinity rule

This is a preferred rule, it spreads Kafka brokers evenly across racks in a best-effort manner.

template:
  pod:
    affinity:
      podAntiAffinity:
        preferredDuringSchedulingIgnoredDuringExecution:
          - podAffinityTerm:
              labelSelector:
                matchLabels:
                  strimzi.io/cluster:
                  strimzi.io/name:
              topologyKey: topology.kubernetes.io/zone
            weight: 100