Advanced replication use cases and examples
A collection of advanced replication flow examples for different use cases leveraging the configuration options available with Kafka Connect-based replication.
- A replication flow that uses multiple MirrorSourceConnector
instances
This type of replication flow can be used if you want to replicate different topics, located in the same source cluster, using different replication configurations.
- A replication flow that is deployed on multiple Kafka Connect clusters
This type of replication flow can be used to separate replication work across available resources. It enables you to have dedicated resources for specific replication workloads.
- A bidirectional and prefixless replication flow
This type of replication flow can be used if your business requires a bidirectional replication setup that also uses prefixless replication. That is, you require a deployment that has topics that are actively produced, consumed, and bidirectionally replicated at the same time with the topic names remaining unchanged during replication.
Deploying a replication flow that uses multiple MirrorSourceConnector instances
Learn how to deploy a replication flow that uses multiple MirrorSourceConnector instances to carry out replication of a single source cluster. Using multiple MirrorSourceConnectors enables you to replicate different topics using different replication configurations.
Using Kafka Connect-based replication, you can deploy replication flows that use multiple instances of the MirrorSourceConnector. Using multiple connector instances, in turn, enables you to fine tune how data is replicated and apply different replication configurations to different topics that are in the same source Kafka cluster. For example you can configure the following.
- Different compression type per connector
- Different replication factor per connector
- Different single message transforms per connector
- Different offset sync frequency
The MirrorSourceConnector instances are deployed in a single Kafka Connect cluster. Additionally, all connector instances connect to the same source and target clusters. All other configurations related to replication can be customized in each connector instance.
The following example demonstrates a configuration where a header is added to a subset of replicated topics. This is done by creating two MirrorSourceConnector instances and configuring one of them to use Single Message Transforms (SMT).
To deploy replication flow with multiple MirrorSourceConnector instances, follow the steps in the Deploying a replication flow with the following changes.
- Create multiple ConfigMaps that store configuration related
to each replication.
In this example, a total of three ConfigMaps are created. One that stores configuration properties common to both MirrorSourceConnector instances, and an additional two that store configuration properties specific to each MirrorSourceConnector instance.
kubectl create configmap [***COMMON REPLICATION CONFIGMAP***] \ --from-literal=replication.policy.class="org.apache.kafka.connect.mirror .DefaultReplicationPolicy" \ --namespace [***REPLICATION NS***]
kubectl create configmap [***FIRST REPLICATION CONFIGMAP***] \ --from-literal=topics="test.*" \ --namespace [***REPLICATION NS***]
kubectl create configmap [***SECOND REPLICATION CONFIGMAP***] \ --from-literal=topics="prod.*" \ --from-literal=transformHeaderKey="my_header_key" --from-literal=transformHeaderValue="my_header_value" --namespace [***REPLICATION NS***]
- Deploy multiple MirrorSourceConnector instances.
Configure each connector instance as needed. Reference appropriate ConfigMaps.
In this example, two connector instances are deployed. Both instances reference [***COMMON REPLICATION CONFIGMAP***] created in the previous step. Additionally, each connector instance also pulls configuration from their respective ConfigMaps.
The second connector instance includes an SMT chain that applies a transformation on the replicated records. The SMT chain adds a header to each record replicated by the second connector instance.
#... kind: KafkaConnector metadata: name: first-mirror-source-connector spec: class: org.apache.kafka.connect.mirror.MirrorSourceConnector config: topics: ${cfmap:[***REPLICATION NS***]/[***FIRST REPLICATION CONFIGMAP***]:topics} # use common replication policy in both connectors replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***COMMON REPLICATION CONFIGMAP***]:replication.policy.class}
#... kind: KafkaConnector metadata: name: second-mirror-source-connector spec: class: org.apache.kafka.connect.mirror.MirrorSourceConnector config: topics: ${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:topics} transforms: insertHeader transforms.insertHeader.type: org.apache.kafka.connect.transforms.InsertHeader transforms.insertHeader.header: ${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:transformHeaderKey} transforms.insertHeader.value.literal: ${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:transformHeaderValue} # use common replication policy in both connectors replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***COMMON REPLICATION CONFIGMAP***]:replication.policy.class}
- Configure the
topics
property of your MirrorCheckpointConnector instances so that they include all topics that are replicated.# ... kind: KafkaConnector metadata: name: mirror-checkpoint-connector spec: class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector tasksMax: 3 config: topics: ${cfmap:[***REPLICATION NS***]/[***FIRST REPLICATION CONFIGMAP***]:topics},${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:topics} groups: [***CONSUMER GROUP NAME***]
Deploying a replication flow on multiple Kafka Connect clusters
Learn how to set up a replication flow that is deployed on two or more Kafka Connect clusters. A replication set up like this enables you to spread replication tasks across available clusters allowing you to have dedicated resources for critical workloads.
Using Kafka Connect-based replication, you can set up a replication flow that is spread across multiple Kafka Connect clusters. This is done by deploying multiple Kafka Connect clusters and an instance of each replication connector on each Kafka Connect cluster. Afterward, you configure your replication connectors to replicate data between the same source and target Kafka cluster pair.
In a setup like this, replication between a source and a target cluster is carried out by multiple sets of replication connectors. In a standard deployment, replication would be carried out by a single set of the replication connectors.
A replication set up that uses multiple Kafka connect clusters makes it possible for you to designate Kafka Connect clusters to handle specific types of workloads. For example, you can dedicate a cluster to handle replication of business critical topics, while a different cluster can handle the replication of other topics.
To deploy a replication flow with multiple Kafka Connect clusters, follow the steps in the Deploying a replication flow with the following changes.
- Deploy multiple Kafka Connect clusters with differing configurations in separate
namespaces.
When deploying multiple Kafka Connect clusters, use a different namespace for each cluster. This way it is possible to separate the dedicated configurations and Secrets into the appropriate namespace. It also makes it possible to use namespaced limitations, such as ResourceQuota, or to apply different namespaced access control policies to each Kafka Connect cluster.
For example, you can set up two Kafka Connect clusters. One to handle small workloads, which has fewer replicas and fewer resources allocated. Additionally, one for larger workloads, which has more replicas, higher resource allocation, as well as limits.
#... kind: KafkaConnect metadata: name: small-workload namespace: [***SMALL WORKLOAD NS***] spec: replicas: 3 config: # Custom configuration resources: requests: memory: "1Gi" cpu: "1" limits: memory: "2Gi" cpu: "2"
#... kind: KafkaConnect metadata: name: high-workload namespace: [***HIGH WORKLOAD NS***] spec: replicas: 6 config: # Custom configuration for higher workloads resources: requests: memory: "6Gi" cpu: "2" limits: memory: "12Gi" cpu: "4"
- Create ConfigMaps that contain replication related properties
in each of your namespaces.
Assuming that you are deploying two Kafka Connect clusters, one for small and one for high workloads, you would create two ConfigMaps that contain the topic filter (
topics
property) configuration for your MirrorSourceConnector instances.For example, the MirrorSourceConnector instance in the high workload Kafka Connect cluster can replicate business critical topics with high expected volume. The connector running in the small workload cluster can handle the replication of less critical topics with less expected volume.
kubectl create configmap [***SMALL WORKLOAD CONFIGMAP***] \ --from-literal=topics="low.*" \ --namespace [***SMALL WORKLOAD NS***]
kubectl create configmap [***HIGH WORKLOAD CONFIGMAP***] \ --from-literal=topics="high.*" \ --namespace [***HIGH WORKLOAD NS***]
- Deploy and configure your MirrorSourceConnector instances to
replicate appropriate topics.
#... kind: KafkaConnector metadata: name: mirror-source-connector namespace: [***SMALL WORKLOAD NS***] labels: strimzi.io/cluster: small-workload spec: class: org.apache.kafka.connect.mirror.MirrorSourceConnector tasksMax: 3 config: topics: ${cfmap:[***SMALL WORKLOAD NS***]/[***SMALL WORKLOAD CONFIGMAP***]:topics}
#... kind: KafkaConnector metadata: name: mirror-source-connector namespace: [***HIGH WORKLOAD NS***] labels: strimzi.io/cluster: high-workload spec: class: org.apache.kafka.connect.mirror.MirrorSourceConnector tasksMax: 3 config: topics: ${cfmap:[***HIGH WORKLOAD NS***]/[***HIGH WORKLOAD CONFIGMAP***]:topics}
- Configure the
topics
property of your MirrorCheckpointConnector instances so that they include all topics that are replicated.# ... kind: KafkaConnector metadata: name: mirror-checkpoint-connector namespace: [***SMALL WORKLOAD NS***] spec: class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector tasksMax: 3 config: topics: ${cfmap:[***SMALL WORKLOAD NS***]/[***SMALL WORKLOAD CONFIGMAP***]:topics} groups: [***CONSUMER GROUP NAME***]
# ... kind: KafkaConnector metadata: name: mirror-checkpoint-connector namespace: [***HIGH WORKLOAD NS***] spec: class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector tasksMax: 3 config: topics: ${cfmap:[***HIGH WORKLOAD NS***]/[***HIGH WORKLOAD CONFIGMAP***]:topics} groups: [***CONSUMER GROUP NAME***]
Deploying bidirectional and prefixless replication flows
Learn how to set up bidirectional and prefixless replication flows. A replication set up like is achieved with the use Single Message Transforms (SMT).
Using out of the box configurations and behavior, deploying a bidirectional flow that uses the prefixless replication policy (IdentityReplicationPolicy)) is not recommended. This is because the prefixless replication policy does not support replication loop detection. By default, a setup like this results in records being replicated infinitely between your source and target clusters.
However, using such a setup can make failover and failback scenarios easy. This is because bidirectional and prefixless setup requires minimal reconfiguration of Kafka clients when you failover or failback between Kafka clusters. You only need to reroute the client to connect to a different cluster. Reconfiguring clients to consume from or produce to differently named (prefixed) topics is not necessary.
A bidirectional and prefixless replication setup can be achieved when using Kafka Connect-based replication with the use of an SMT chain. By deploying an SMT chain on top of your replication flow, you can effectively filter replication loops while still having at-least-once guarantees.
To configure a bidirectional and prefixless replication flow, follow the steps in Deploying a replication flow with the following changes.
- Deploy two replication flows.
-
The replication setup must be bidirectional. One replication replicates data from cluster A to cluster B, the second replicates data from B to A.
-
The replication flows are configured to use prefixless replication. That is,
replication.policy.class
property of the MirrorSourceConnector and MirrorHeartbeatConnector instances are set to org.apache.kafka.connect.mirror.IdentityReplicationPolicy. -
Skip the creation of MirrorCheckpointConnector instances. MirrorCheckpointConnector instances enable consumer group offset synchronization, which is not supported in a bidirectional setup. Creating these connectors is not necessary.
-
- Configure your MirrorSourceConnector instances with an SMT
chain that filters replication loops.
The following is an example MirrorSourceConnector instance that uses the Filter and InsertHeader transforms as well as the HasHeaderKey predicate. When used in combination, these plugins provide a way to filter replication loops.
#... kind: KafkaConnector spec: class: org.apache.kafka.connect.mirror.MirrorSourceConnector tasksMax: 2 config: transforms: FilterReplicatedFromTarget,InsertHeader transforms.FilterReplicatedFromTarget.type: org.apache.kafka.connect.transforms.Filter transforms.FilterReplicatedFromTarget.predicate: ReplicatedFromTarget transforms.InsertHeader.type: org.apache.kafka.connect.transforms.InsertHeader transforms.InsertHeader.header: replicated-from-${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias} transforms.InsertHeader.value.literal: true predicates: ReplicatedFromTarget predicates.ReplicatedFromTarget.type: org.apache.kafka.connect.transforms.predicates.HasHeaderKey predicates.ReplicatedFromTarget.name: replicated-from-${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias} emit.offset-syncs.enabled: false
Notice the following about this example.
-
The InsertHeader transformation adds a new header for each replicated record. The header marks each record. This way the record include information on which cluster it came from.
-
The ReplicatedFromTarget predicate returns true if a record already has the configured target cluster related replication header. In other words, it returns true if the record came from the target cluster earlier.
-
The FilterReplicatedFromTarget transformation excludes records from replication for which the ReplicatedFromTarget predicate returns true. This filters replication loops because a record is never replicated back to a cluster where it was replicated from. This does not mean that some records are not consumed from source. All records are consumed. The records that would cause a replication loop are dropped.
-
emit.offset-syncs.enabled
is set tofalse
to disable creation of the offset syncs internal topic. This is done because checkpointing is not supported in this set up. MirrorCheckpointConnector instances are not created. Creating this internal topic is unnecessary.
-