Advanced replication use cases and examples

A collection of advanced replication flow examples for different use cases leveraging the configuration options available with Kafka Connect-based replication.

When using Kafka Connect-based replication, you have full control over the configuration and deployment of the replication connectors (MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector). As a result, you can fine-tune your replication set up and deploy the following types of replication flows.
  • A replication flow that uses multiple MirrorSourceConnector instances

    This type of replication flow can be used if you want to replicate different topics, located in the same source cluster, using different replication configurations.

  • A replication flow that is deployed on multiple Kafka Connect clusters

    This type of replication flow can be used to separate replication work across available resources. It enables you to have dedicated resources for specific replication workloads.

  • A bidirectional and prefixless replication flow

    This type of replication flow can be used if your business requires a bidirectional replication setup that also uses prefixless replication. That is, you require a deployment that has topics that are actively produced, consumed, and bidirectionally replicated at the same time with the topic names remaining unchanged during replication.

The following provides instructions and examples that demonstrate how you can set up each of these replication flows.

Deploying a replication flow that uses multiple MirrorSourceConnector instances

Learn how to deploy a replication flow that uses multiple MirrorSourceConnector instances to carry out replication of a single source cluster. Using multiple MirrorSourceConnectors enables you to replicate different topics using different replication configurations.

Using Kafka Connect-based replication, you can deploy replication flows that use multiple instances of the MirrorSourceConnector. Using multiple connector instances, in turn, enables you to fine tune how data is replicated and apply different replication configurations to different topics that are in the same source Kafka cluster. For example you can configure the following.

  • Different compression type per connector
  • Different replication factor per connector
  • Different single message transforms per connector
  • Different offset sync frequency

The MirrorSourceConnector instances are deployed in a single Kafka Connect cluster. Additionally, all connector instances connect to the same source and target clusters. All other configurations related to replication can be customized in each connector instance.

The following example demonstrates a configuration where a header is added to a subset of replicated topics. This is done by creating two MirrorSourceConnector instances and configuring one of them to use Single Message Transforms (SMT).

Steps

To deploy replication flow with multiple MirrorSourceConnector instances, follow the steps in the Deploying a replication flow with the following changes.

  1. Create multiple ConfigMaps that store configuration related to each replication.

    In this example, a total of three ConfigMaps are created. One that stores configuration properties common to both MirrorSourceConnector instances, and an additional two that store configuration properties specific to each MirrorSourceConnector instance.

    kubectl create configmap [***COMMON REPLICATION CONFIGMAP***] \
      --from-literal=replication.policy.class="org.apache.kafka.connect.mirror
    .DefaultReplicationPolicy" \
      --namespace [***REPLICATION NS***]
    
    kubectl create configmap [***FIRST REPLICATION CONFIGMAP***] \
      --from-literal=topics="test.*" \
      --namespace [***REPLICATION NS***]
    kubectl create configmap [***SECOND REPLICATION CONFIGMAP***] \
      --from-literal=topics="prod.*" \
      --from-literal=transformHeaderKey="my_header_key"
      --from-literal=transformHeaderValue="my_header_value"
      --namespace [***REPLICATION NS***]
  2. Deploy multiple MirrorSourceConnector instances.

    Configure each connector instance as needed. Reference appropriate ConfigMaps.

    In this example, two connector instances are deployed. Both instances reference [***COMMON REPLICATION CONFIGMAP***] created in the previous step. Additionally, each connector instance also pulls configuration from their respective ConfigMaps.

    The second connector instance includes an SMT chain that applies a transformation on the replicated records. The SMT chain adds a header to each record replicated by the second connector instance.

    #...
    kind: KafkaConnector
    metadata:
      name: first-mirror-source-connector
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      config:
        topics: ${cfmap:[***REPLICATION NS***]/[***FIRST REPLICATION CONFIGMAP***]:topics}
    
        # use common replication policy in both connectors
        replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***COMMON REPLICATION CONFIGMAP***]:replication.policy.class}
    
    #...
    kind: KafkaConnector
    metadata:
      name: second-mirror-source-connector
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      config:
        topics: ${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:topics}
    
        transforms: insertHeader
        transforms.insertHeader.type: org.apache.kafka.connect.transforms.InsertHeader
        transforms.insertHeader.header: ${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:transformHeaderKey}
        transforms.insertHeader.value.literal: ${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:transformHeaderValue}
    
        # use common replication policy in both connectors
        replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***COMMON REPLICATION CONFIGMAP***]:replication.policy.class}
  3. Configure the topics property of your MirrorCheckpointConnector instances so that they include all topics that are replicated.
    # ...
    kind: KafkaConnector
    metadata:
      name: mirror-checkpoint-connector
    spec:
      class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
      tasksMax: 3
      config:
        topics: ${cfmap:[***REPLICATION NS***]/[***FIRST REPLICATION CONFIGMAP***]:topics},${cfmap:[***REPLICATION NS***]/[***SECOND REPLICATION CONFIGMAP***]:topics}
        groups: [***CONSUMER GROUP NAME***]

Deploying a replication flow on multiple Kafka Connect clusters

Learn how to set up a replication flow that is deployed on two or more Kafka Connect clusters. A replication set up like this enables you to spread replication tasks across available clusters allowing you to have dedicated resources for critical workloads.

Using Kafka Connect-based replication, you can set up a replication flow that is spread across multiple Kafka Connect clusters. This is done by deploying multiple Kafka Connect clusters and an instance of each replication connector on each Kafka Connect cluster. Afterward, you configure your replication connectors to replicate data between the same source and target Kafka cluster pair.

In a setup like this, replication between a source and a target cluster is carried out by multiple sets of replication connectors. In a standard deployment, replication would be carried out by a single set of the replication connectors.

A replication set up that uses multiple Kafka connect clusters makes it possible for you to designate Kafka Connect clusters to handle specific types of workloads. For example, you can dedicate a cluster to handle replication of business critical topics, while a different cluster can handle the replication of other topics.

Steps

To deploy a replication flow with multiple Kafka Connect clusters, follow the steps in the Deploying a replication flow with the following changes.

  1. Deploy multiple Kafka Connect clusters with differing configurations in separate namespaces.

    When deploying multiple Kafka Connect clusters, use a different namespace for each cluster. This way it is possible to separate the dedicated configurations and Secrets into the appropriate namespace. It also makes it possible to use namespaced limitations, such as ResourceQuota, or to apply different namespaced access control policies to each Kafka Connect cluster.

    For example, you can set up two Kafka Connect clusters. One to handle small workloads, which has fewer replicas and fewer resources allocated. Additionally, one for larger workloads, which has more replicas, higher resource allocation, as well as limits.

    #...
    kind: KafkaConnect
    metadata:
      name: small-workload
      namespace: [***SMALL WORKLOAD NS***]
    spec:
      replicas: 3
      config:
        # Custom configuration
      resources:
        requests:
          memory: "1Gi"
          cpu: "1"
        limits:
          memory: "2Gi"
          cpu: "2"
    #...
    kind: KafkaConnect
    metadata:
      name: high-workload
      namespace: [***HIGH WORKLOAD NS***]
    spec:
      replicas: 6
      config:
        # Custom configuration for higher workloads
      resources:
        requests:
          memory: "6Gi"
          cpu: "2"
        limits:
          memory: "12Gi"
          cpu: "4"
  2. Create ConfigMaps that contain replication related properties in each of your namespaces.

    Assuming that you are deploying two Kafka Connect clusters, one for small and one for high workloads, you would create two ConfigMaps that contain the topic filter (topics property) configuration for your MirrorSourceConnector instances.

    For example, the MirrorSourceConnector instance in the high workload Kafka Connect cluster can replicate business critical topics with high expected volume. The connector running in the small workload cluster can handle the replication of less critical topics with less expected volume.

    kubectl create configmap [***SMALL WORKLOAD CONFIGMAP***] \
      --from-literal=topics="low.*" \
      --namespace [***SMALL WORKLOAD NS***]
    kubectl create configmap [***HIGH WORKLOAD CONFIGMAP***] \
      --from-literal=topics="high.*" \
      --namespace [***HIGH WORKLOAD NS***]
    
  3. Deploy and configure your MirrorSourceConnector instances to replicate appropriate topics.
    #...
    kind: KafkaConnector
    metadata:
      name: mirror-source-connector
      namespace: [***SMALL WORKLOAD NS***]
      labels:
        strimzi.io/cluster: small-workload
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      tasksMax: 3
      config:
        topics: ${cfmap:[***SMALL WORKLOAD NS***]/[***SMALL WORKLOAD CONFIGMAP***]:topics}
    #...
    kind: KafkaConnector
    metadata:
      name: mirror-source-connector
      namespace: [***HIGH WORKLOAD NS***]
      labels:
        strimzi.io/cluster: high-workload
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      tasksMax: 3
      config:
        topics: ${cfmap:[***HIGH WORKLOAD NS***]/[***HIGH WORKLOAD CONFIGMAP***]:topics}
  4. Configure the topics property of your MirrorCheckpointConnector instances so that they include all topics that are replicated.
    # ...
    kind: KafkaConnector
    metadata:
      name: mirror-checkpoint-connector
      namespace: [***SMALL WORKLOAD NS***]
    spec:
      class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
      tasksMax: 3
      config:
        topics: ${cfmap:[***SMALL WORKLOAD NS***]/[***SMALL WORKLOAD CONFIGMAP***]:topics}
        groups: [***CONSUMER GROUP NAME***]
    # ...
    kind: KafkaConnector
    metadata:
      name: mirror-checkpoint-connector
      namespace: [***HIGH WORKLOAD NS***]
    spec:
      class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
      tasksMax: 3
      config:
        topics: ${cfmap:[***HIGH WORKLOAD NS***]/[***HIGH WORKLOAD CONFIGMAP***]:topics}
        groups: [***CONSUMER GROUP NAME***]

Deploying bidirectional and prefixless replication flows

Learn how to set up bidirectional and prefixless replication flows. A replication set up like is achieved with the use Single Message Transforms (SMT).

Using out of the box configurations and behavior, deploying a bidirectional flow that uses the prefixless replication policy (IdentityReplicationPolicy)) is not recommended. This is because the prefixless replication policy does not support replication loop detection. By default, a setup like this results in records being replicated infinitely between your source and target clusters.

However, using such a setup can make failover and failback scenarios easy. This is because bidirectional and prefixless setup requires minimal reconfiguration of Kafka clients when you failover or failback between Kafka clusters. You only need to reroute the client to connect to a different cluster. Reconfiguring clients to consume from or produce to differently named (prefixed) topics is not necessary.

A bidirectional and prefixless replication setup can be achieved when using Kafka Connect-based replication with the use of an SMT chain. By deploying an SMT chain on top of your replication flow, you can effectively filter replication loops while still having at-least-once guarantees.

Steps

To configure a bidirectional and prefixless replication flow, follow the steps in Deploying a replication flow with the following changes.

  1. Deploy two replication flows.
    • The replication setup must be bidirectional. One replication replicates data from cluster A to cluster B, the second replicates data from B to A.

    • The replication flows are configured to use prefixless replication. That is, replication.policy.class property of the MirrorSourceConnector and MirrorHeartbeatConnector instances are set to org.apache.kafka.connect.mirror.IdentityReplicationPolicy.

    • Skip the creation of MirrorCheckpointConnector instances. MirrorCheckpointConnector instances enable consumer group offset synchronization, which is not supported in a bidirectional setup. Creating these connectors is not necessary.

  2. Configure your MirrorSourceConnector instances with an SMT chain that filters replication loops.

    The following is an example MirrorSourceConnector instance that uses the Filter and InsertHeader transforms as well as the HasHeaderKey predicate. When used in combination, these plugins provide a way to filter replication loops.

    #...
    kind: KafkaConnector
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      tasksMax: 2
      config:
       transforms: FilterReplicatedFromTarget,InsertHeader
       transforms.FilterReplicatedFromTarget.type: org.apache.kafka.connect.transforms.Filter
       transforms.FilterReplicatedFromTarget.predicate: ReplicatedFromTarget
       transforms.InsertHeader.type: org.apache.kafka.connect.transforms.InsertHeader
       transforms.InsertHeader.header: replicated-from-${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias}
       transforms.InsertHeader.value.literal: true
       predicates: ReplicatedFromTarget
       predicates.ReplicatedFromTarget.type: org.apache.kafka.connect.transforms.predicates.HasHeaderKey
       predicates.ReplicatedFromTarget.name: replicated-from-${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias}
       emit.offset-syncs.enabled: false

    Notice the following about this example.

    • The InsertHeader transformation adds a new header for each replicated record. The header marks each record. This way the record include information on which cluster it came from.

    • The ReplicatedFromTarget predicate returns true if a record already has the configured target cluster related replication header. In other words, it returns true if the record came from the target cluster earlier.

    • The FilterReplicatedFromTarget transformation excludes records from replication for which the ReplicatedFromTarget predicate returns true. This filters replication loops because a record is never replicated back to a cluster where it was replicated from. This does not mean that some records are not consumed from source. All records are consumed. The records that would cause a replication loop are dropped.

    • emit.offset-syncs.enabled is set to false to disable creation of the offset syncs internal topic. This is done because checkpointing is not supported in this set up. MirrorCheckpointConnector instances are not created. Creating this internal topic is unnecessary.