Replication overview

Learn about Kafka data replication in Cloudera Streams Messaging - Kubernetes Operator. Get familiar with the concept of replication flows, replication aliases, and replication policies. Additionally learn the replication architecture recommended by Cloudera.

Cloudera Streams Messaging - Kubernetes Operator does not support MirrorMaker 2 or using the KafkaMirrorMaker2 resource shipped with Strimzi to replicate data between Kafka clusters. Replication between Kafka clusters is instead achieved by manually deploying Kafka Connect clusters and instances of the MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector, which are collectively called the replication connectors. A replication setup like this is referred to as Kafka Connect-based replication.

Using Kafka Connect-based replication offers a complete replication solution that is scalable, robust, and fault tolerant. It supports the same key features as MirrorMaker 2. For example:

  • Replication of Kafka topic partitions to have multiple copies of the same data in different Kafka clusters to avoid data loss in case of data center failure.
  • Replication of Kafka consumer group offsets to be able to fail over between clusters without losing data.
  • Ability to monitor your replication at any time.

In addition, Kafka Connect-based replication has a number of advantages over using MirrorMaker 2, such as:

  • Single Messages Transforms (SMTs) can be configured for data replication.
  • Manipulating source offsets is possible using the Kafka Connect REST API.
  • Some replication architectures, like unidirectional replication, require less resources and Kafka Connect groups when using overrides for heartbeating.

Replication flows

Replication involves sending records and consumer group checkpoints from a source cluster to a target cluster. A replication flow (also referred to as replication or flow) specifies a source and target cluster pair, the direction in which data is flowing and the topics that are being replicated.

For example, assume you have two clusters, A and B. You want to replicate data from A to B. To do so you set up an A->B replication flow. If you wanted to replicate from B to A, you set up a B->A replication flow.

Each replication flow specifies what topics to replicate by way of topic filters (also referred to as allow and deny lists) using the topics connector property. Therefore, you have full control over what is and what is not replicated

Replication aliases

In any replication flow, the two clusters taking part in the replication must have an alias. The alias is a short name that represents and identifies the cluster.

Aliases are arbitrary, user-defined names. Generally the alias describes your cluster. For example, you can use aliases that are based on the geographic location of the cluster, like us-east or us-west. Alternatively, in a simple, two-cluster setup with a single replication flow you could use aliases like source and target. Aliases are used by default for prefixing replicated topics. Therefore, using descriptive aliases can help when monitoring replication.

Even though you are free to specify any alias you want, you must use the same aliases for your cluster across all replication flows that you deploy. For example, consider that you set up two replication flows; one between clusters A and B and the second between clusters A and C. You must ensure that the alias of cluster A is the same in both replication flows. For example, A->B and A->C. Additionally, if you later on decide to deploy another replication flow between clusters B and C, you must ensure that both B and C clusters have the same aliases in the newly deployed replication flow as well. For example B->C.

Replication policies

In any replication flow, the selected source topics are replicated to replicated topics on the target cluster. The basic rules of how these topics are replicated is defined by the replication policy.

Cloudera Streams Messaging - Kubernetes Operator ships with the following two replication policies. The main difference between the two policies is how they name replicated topics.
Table 1. Replication policies available in Cloudera Streams Messaging - Kubernetes Operator
Replication policy Fully qualified name
DefaultReplicationPolicy org.apache.kafka.connect.mirror.DefaultReplicationPolicy
IdentityReplicationPolicy org.apache.kafka.connect.mirror.IdentityReplicationPolicy

DefaultReplicationPolicy

The DefaultReplicationPolicy is the default and Cloudera-recommended replication policy. This is because the DefaultReplicationPolicy is capable of automatically detecting replication loops. This policy prefixes the replicated topic's name with the alias of the source cluster.

For example, the topic1 topic from the us-west source cluster creates the us-west.topic1 topic on the target cluster.

Figure 1. Single-hop replication using the DefaultReplicationPolicy

If a replicated topic is also replicated (there are multiple replication hops in your setup) the replicated topic references all source and target clusters. The prefix in the name will start with the cluster closest to the final target cluster. For example, the topic1 topic replicated from the us-west source cluster to the us-east cluster and then to the eu-west cluster will be named us-east.us-west.topic1.

Figure 2. Two-hop replication using the DefaultReplicationPolicy

IdentityReplicationPolicy

The IdentityReplicationPolicy does not change the names of replicated topics. When this policy is in use, topics retain the same name on both source and target clusters. This type of replication is also referred to as prefixless replication.

For example, the topic1 topic from the us-west source cluster creates the topic1 replicated topic on the target cluster.
Figure 3. Single-hop replication using the IdentityReplicationPolicy

Cloudera recommends that you use this replication policy in the following use cases.

  • Migrating Kafka data from one cluster to another.
  • Aggregating the same topic from multiple clusters to a single target cluster.
  • Use cases where MirrorMaker 1 compatible replication is required.

Typical replication architecture

Learn about the typical replication architecture used for replicating Kafka data with Cloudera Streams Messaging - Kubernetes Operator.

When using Kafka Connect-based replication, you set up Kafka Connect clusters and deploy instances of the replication connectors inside the clusters. The Kafka Connect clusters and the connector instances together make up a replication flow.

A typical architecture for a deployment with multiple replication flows is as follows.

Figure 4. Typical replication architecture with two replication flows
Replication flows that you set up in Cloudera Streams Messaging - Kubernetes Operator should follow these architectural principles.
One Kafka Connect cluster for each replication flow
Replication is carried out by the replication connectors. To be able to run these connectors, a Kafka Connect cluster is required where you deploy these connectors. In Cloudera Streams Messaging - Kubernetes Operator, Cloudera recommends that you deploy a Kafka Connect cluster (Kafka Connect group) for each and every replication flow that you want to create.

Deploying a unique Kafka Connect cluster for each replication flow makes it easier to manage your different replication flows. This results in easier monitoring, troubleshooting and reduced rebalance times.

Kafka Connect clusters depend on the target Kafka cluster
Any Kafka Connect cluster that you deploy requires a Kafka cluster as a dependency. Kafka Connect uses the Kafka cluster to store information about its state in internal topics.

For Kafka Connect clusters that you deploy for replication, the cluster must always depend on the target Kafka cluster of replication flow. The dependency is configured in your KafkaConnect resource with spec.bootstrapServers.

This dependency makes configuring the connectors that make up the replication flow easier. Properties required to connect to the target cluster can be sourced from the property file of the Kafka Connect workers.

Group IDs and internal topic names follow a consistent naming convention
In a production environment, it is highly likely that you will create multiple replication flows and therefore deploy multiple Kafka Connect clusters. Ensure that the group IDs and internal topic names are explicitly configured for each Kafka Connect cluster. These are configured with the group.id, and *storage.topic worker properties.

By default both the group ID and internal topic names are hardcoded. If you do not set them explicitly in your KafkaConnect resource, the IDs and names will clash across your clusters.

Cloudera recommends that you use a consistent naming convention in environments with multiple Kafka Connect clusters. A consistent naming convention can help in avoiding confusion in your configurations down the line.

Replication policy is consistent across connectors and replication flows
The replication policy configured with replication.policy.class connector property must be identical in all connectors instances that make up a replication flow. This is because the replication policy influences the behavior of the connectors.

Additionally, if you are deploying multiple replication flows where a replication flow replicates replicated topics (you have multiple replication hops), you must ensure that all replication flows use the same replication policy.

For example, consider that you are replicating a topic from cluster A to cluster B, and then from cluster B to cluster C. This setup requires two replication flows, A->B and B->C. Both replication flows must use the same replication policy.

Topic filters are consistent across all connectors in a replication flow
The topic filters configured with the topics connector property must be an exact match in the MirrorSourceConnector and MirrorCheckpointConnector instances that are part of the same replication flow. This is a must have to ensure that both data and offsets are replicated properly.
Cluster aliases are consistent across all replication flows
Cluster aliases configured with the source.cluster.alias and target.cluster.alias connector properties must be configured in each connector instance to use the same alias for the same cluster. This must be true across all replication flows that you deploy.