Deploying a replication flow

You deploy a replication flow between two clusters by deploying a Kafka Connect cluster and an instance of each replication connector (MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector). Additionally, you create various ConfigMaps and Secrets that store configuration required for replication.

The following steps walk you through how you can create a replication flow between two secured Kafka clusters. Both Kafka and Kafka Connect are deployed in Kubernetes.

Figure 1. Replication flow between two secured Kafka clusters

The Kafka Connect cluster that you set up must be a new cluster and must be dedicated to the replication flow. Reusing an existing cluster that is running other connectors or using the same cluster for multiple replication flows is not recommended.

Replication of Kafka data as well as other replication-related tasks are carried out by the replication connectors. These are the MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector.

Deploying an instance of the MirrorSourceConnector and MirrorChekpointConnector are mandatory. Deploying MirrorHeartbeatConnector is optional.

The connectors load their connection-related configuration from various Secrets, Configmaps, as well as property files.

This example uses the DefaultReplicationpolicy, but provides instructions on what connector properties you need to add if you want to use the IdentityReplicationPolicy (prefixless replication).

These steps assume that the two Kafka clusters have TLS encryption and PLAIN authentication enabled. Replication can be configured for any other type of security as well, but you will need to change the appropriate security configurations.

For example, assume that one of the clusters does not use PLAIN, but a different authentication method. In a case like this, you must collect and specify the configuration properties appropriate for that authentication method. Configuration related to security is stored in ConfigMaps and Secrets that you will be setting up.
  1. Collect the following for both source and target Kafka clusters.
    • Bootstrap servers
    • TLS truststore/crt
    • TLS truststore password
    • PLAIN credentials

    The configurations you collect here will be specified in the Secrets and ConfigMaps and the KafkaConnect resource that you create in the following steps.

  2. Create a namespace.
    kubectl create namespace [***REPLICATION NS***]
    You deploy all resources required for the replication flow in this namespace.
  3. Create a Secret containing credentials for the Docker registry where CSM Operator artifacts are hosted.
    kubectl create secret docker-registry [***SECRET NAME***] \
      --docker-server [***REGISTRY***] \
      --docker-username [***USERNAME***] \
      --docker-password [***PASSWORD***] \
      --namespace [***REPLICATION NS***]
    • [***SECRET NAME***] must be the same as the name of the Secret containing registry credentials that you created during Strimzi installation.
    • Replace [***REGISTRY***] with the server location of the Docker registry where CSM Operator artifacts are hosted. If your Kubernetes cluster has internet access, use container.repository.cloudera.com. Otherwise, enter the server location of your self-hosted registry.
    • Replace [***USERNAME***] and [***PASSWORD***] with credentials that provide access to the registry. If you are using container.repository.cloudera.com, use your Cloudera credentials. Otherwise, enter credentials providing access to your self-hosted registry.
  4. Create a ConfigMap and two Secrets for the target Kafka cluster.
    These resources store configuration that provides access to the target Kafka cluster.
    1. Create a ConfigMap that contains the non-sensitive configuration properties of the target Kafka cluster.
      kubectl create configmap [***TARGET CONFIGMAP***] \
        --from-literal=alias=[***TARGET CLUSTER ALIAS***] \
        --namespace [***REPLICATION NS***]
      This ConfigMap does not need to include connection related properties like the bootstrap server. These connection properties will be sourced from the Kafka Connect worker’s (cluster) property file. Sourcing them from the workers' property file is possible because Kafka Connect will depend on the target Kafka cluster. You can use this ConfigMap to store other reusable properties.
    2. Create a Secret containing the PLAIN password to use when connecting to the target Kafka cluster.
      kubectl create secret generic [***TARGET PASSWORD SECRET***] \
        --from-literal=pass=[***PASSWORD***] \
        --namespace [***REPLICATION NS***]
    3. Create a Secret that contains the TLS Certificate Authority (CA) certificate of the target Kafka cluster.
      kubectl create secret generic [***TARGET CERT SECRET***] \
        --from-file=ca.crt=[***PATH TO CA CERT***] \
        --namespace [***REPLICATION NS***]
  5. Create a ConfigMap and a Secret for the source Kafka cluster.
    These resources store configuration that provide access to the source Kafka cluster.
    1. Create a Secret that contains the truststore file, the truststore password, and JAAS configuration of the source Kafka cluster.
      kubectl create secret generic [***SOURCE SECRET***] \
        --from-literal=ssl.truststore.password=[***TRUSTSTORE PASSWORD***] \
        --from-file=truststore.jks=[***TRUSTSTORE FILE***] \
        --from-literal=sasl.jaas.config='org.apache.kafka.common.security.plain.PlainLoginModule required username="[***USERNAME***]" password="[***PASSWORD***]";' \
        --namespace [***REPLICATION NS***]
    2. Create a ConfigMap that contains non-sensitive configuration properties of the source Kafka cluster.
      This ConfigMap will contain the cluster alias, connection properties, and any other reusable properties.
      kubectl create configmap [***SOURCE CONFIGMAP***] \
        --from-literal=alias=[***SOURCE CLUSTER ALIAS***] \
        --from-literal=bootstrap.servers=[***SOURCE KAFKA BOOTSTRAP***].[***SOURCE KAFKA NAMESPACE***]:[***PORT***] \
        --from-literal=security.protocol=SASL_SSL \
        --from-literal=sasl.mechanism=PLAIN \
        --from-literal=ssl.truststore.location=/opt/kafka/external-configuration/[***VOLUME NAME***]/truststore.jks \
        --namespace [***REPLICATION NS***]
      

      You will attach the truststore as a volume in a later step. Note down the value you specify for [***VOLUME NAME***]. You will need to provide it in the KafkaConnect resource.

  6. Create a ConfigMap that stores configuration related to replication.
    This ConfigMap will store configuration that is shared by the connectors that you will deploy. This map is created to single source configuration that is common across the connectors.

    This example creates a ConfigMap that defines a single property, topics, which specifies what topics should be replicated. In this example, all test.* topics are added for replication.

    kubectl create configmap [***REPLICATION CONFIGMAP***] \
      --from-literal=topics="test.*" \
      --namespace [***REPLICATION NS***]
    

    This ConfigMap is referred to in the following steps as [***REPLICATION CONFIGMAP***].

  7. Deploy a Kafka Connect cluster.
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: [***CONNECT CLUSTER NAME***]
      namespace: [***REPLICATION NS***]
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
      version: 3.7.0.1.1
      replicas: 3
      bootstrapServers: [***TARGET KAFKA BOOTSTRAP***].[***TARGET KAFKA NAMESPACE***]:[***PORT***]
      tls:
        trustedCertificates:
          - secretName: [***TARGET CERT SECRET***]
            certificate: ca.crt
      authentication:
        type: plain
        username: [***USERNAME***]
        passwordSecret:
          secretName: [***TARGET PASSWORD SECRET***]
          password: pass
      externalConfiguration:
        volumes:
          - name: [***VOLUME NAME***]
            secret:
              secretName: [***SOURCE SECRET***]
              items:
                - key: truststore.jks
                  path: truststore.jks
      config:
        group.id: [***CONNECT CLUSTER NAME***]-consumer-group
        offset.storage.topic: [***CONNECT CLUSTER NAME***]-offsets-topic
        config.storage.topic: [***CONNECT CLUSTER NAME***]-config-topic
        status.storage.topic: [***CONNECT CLUSTER NAME***]-status-topic
        config.storage.replication.factor: -1
        offset.storage.replication.factor: -1
        status.storage.replication.factor: -1
        config.providers: cfmap,secret,file
        config.providers.cfmap.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider
        config.providers.secret.class: io.strimzi.kafka.KubernetesSecretConfigProvider
        config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider

    Notice the following about this resource configuration.

    • The names specified in metadata.name, group.id, and *storage.topic follow a consistent naming convention.

      Cloudera recommends adding cluster aliases to these names as well as using prefixes and postfixes. For example, your cluster name can be repl-uswest-useast. Where repl is a prefix, useast and uswest are the aliases. The group ID can be repl-uswest-useast-consumer-group, where repl-uswest-useast is the name of the cluster, -consumer-group is a postfix.

      The prefixes and postfixes like repl, -consumer-group, -offsets-topic, -config-topic, -status-topic are merely suggestions.

    • bootstrapServers is set to the target Kafka cluster’s bootstrap.

      That is, this Kafka Connect cluster will depend on the target Kafka cluster. This is a must have for correct replication architecture. The .[***TARGET KAFKA NAMESPACE***] postfix is only required because this example assumes that the Kafka cluster is running in Kubernetes.

    • trustedCertificates references a Secret you created in a previous step, which contains the CA certificate of the target cluster.
    • externalConfiguration mounts the truststore file from a Secret you created in a previous step.

      [***VOLUME NAME***] must be an exact match with the volume name you specified when creating the Secret.

    • *.storage.replication.factor properties are set to -1.

      This means that these internal topics are created with the default replication factor configured in the Kafka cluster that this Kafka Connect cluster depends on (the target Kafka cluster).

    • The config.providers.* properties enable various configuration providers.

      These are necessary as the connectors you set up in a later step load configuration from various external resources using these configuration providers.

  8. Create a Role and RoleBinding.
    The KubernetesConfigMapConfigProvider and KubernetesSecretConfigProvider configuration providers specified in the KafkaConnect resource in the previous step, require additional access rights to access the ConfigMaps and Secrets, respectively. Creating the below Role and RoleBinding is required to grant them these privileges.
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: connector-configuration-role
      namespace: [***REPLICATION NS***]
    rules:
      - apiGroups: [""]
        resources: ["secrets"]
        resourceNames: ["[***SOURCE SECRET***]"]
        verbs: ["get"]
      - apiGroups: [""]
        resources: ["configmaps"]
        resourceNames: ["[***SOURCE CONFIGMAP***]", "[***TARGET CONFIGMAP***]", "[****REPLICATION CONFIGMAP**]"]
        verbs: ["get"]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: connector-configuration-role-binding
      namespace: [***REPLICATION NS***]
    subjects:
      - kind: ServiceAccount
        name: [***CONNECT CLUSTER NAME***]-connect
    roleRef:
      kind: Role
      name: connector-configuration-role
      apiGroup: rbac.authorization.k8s.io
    
    • The resource names you specify in rules.apiGroups.resourceNames are the names of the ConfigMap and Secret resources you created for the source and target Kafka clusters in a previous step.
    • The ServiceAccount name is fixed and follows a pattern.

      The name is the Kafka Connect cluster name postfixed with -connect. This name is fixed because the ServiceAccount is generated and named by the Strimzi Cluster Operator. That is, the -connect postfix, is not user defined, ensure that you do not change it.

  9. Enable data replication by deploying an instance of MirrorSourceConnector.
    MirrorSourceConnector requires access to both the source and target Kafka clusters. Therefore, it requires access to all configurations you set up in previous steps. Additionally, some extra configuration is required.

    Configuration required to connect to the target cluster is sourced from the Kafka Connect worker’s property file.

    Configuration required to connect to the source cluster is sourced from the ConfigMap, Secret, and truststore volume you set up for the source cluster in a previous step.

    Other configurations such as the target cluster alias is sourced from the ConfigMap you set up for the target cluster in a previous step.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: mirror-source-connector
      namespace: [***REPLICATION NS***]
      labels:
        strimzi.io/cluster: [***CONNECT CLUSTER NAME***]
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      tasksMax: 3
      config:
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        refresh.topics.interval.seconds: 10
        topics: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:topics}
    
        #replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:replication.policy.class}
    
        # Source cluster configurations - sourced from configmap, secret and volume
        source.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias}
        source.cluster.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers}
        source.cluster.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol}
        source.cluster.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism}
        source.cluster.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config}
        source.cluster.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location}
        source.cluster.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password}
    
        # Target cluster configurations - mostly sourced from the Connect worker config
        target.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias}
        target.cluster.bootstrap.servers: ${file:/tmp/strimzi-connect.properties:bootstrap.servers}
        target.cluster.security.protocol: ${file:/tmp/strimzi-connect.properties:security.protocol}
        target.cluster.sasl.mechanism: ${file:/tmp/strimzi-connect.properties:sasl.mechanism}
        target.cluster.sasl.jaas.config: ${file:/tmp/strimzi-connect.properties:sasl.jaas.config}
        target.cluster.ssl.truststore.location: ${file:/tmp/strimzi-connect.properties:ssl.truststore.location}
        target.cluster.ssl.truststore.password: ${file:/tmp/strimzi-connect.properties:ssl.truststore.password}
    
    • Uncomment the replication.policy.class property if you added this property to [***REPLICATION CONFIGMAP***]. This property configures what replication policy is used for replication.
  10. Enable consumer group offset synchronization by deploying an instance of MirrorCheckpointConnector.
    MirrorCheckpointConnector requires access to both the source and target clusters. Additionally, it requires the same replication policy configuration, topic filters, and offset synchronization configurations as used by MirrorSourceConnector.
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: mirror-checkpoint-connector
      namespace: [***REPLICATION NS***]
      labels:
        strimzi.io/cluster: [***CONNECT CLUSTER NAME***]
    spec:
      class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
      tasksMax: 3
      config:
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        refresh.groups.interval.seconds: 10
        sync.group.offsets.enabled: true
        topics: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:topics}
        groups: test.*
    
        #replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:replication.policy.class}
    
        # Source cluster configurations - sourced from configmap, secret and volume
        source.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias}
        source.cluster.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers}
        source.cluster.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol}
        source.cluster.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism}
        source.cluster.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config}
        source.cluster.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location}
        source.cluster.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password}
    
        # Target cluster configurations - mostly sourced from the Connect worker config
        target.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias}
        target.cluster.bootstrap.servers: ${file:/tmp/strimzi-connect.properties:bootstrap.servers}
        target.cluster.security.protocol: ${file:/tmp/strimzi-connect.properties:security.protocol}
        target.cluster.sasl.mechanism: ${file:/tmp/strimzi-connect.properties:sasl.mechanism}
        target.cluster.sasl.jaas.config: ${file:/tmp/strimzi-connect.properties:sasl.jaas.config}
        target.cluster.ssl.truststore.location: ${file:/tmp/strimzi-connect.properties:ssl.truststore.location}
        target.cluster.ssl.truststore.password: ${file:/tmp/strimzi-connect.properties:ssl.truststore.password}
    • Uncomment the replication.policy.class property if you added this property to [***REPLICATION CONFIGMAP***]. This property configures what replication policy is used for replication.
    • The sync.group.offsets.enabled property is true by default. As a result, setting this property explicitly to true is not necessary. The property is explicitly set to true in this example to highlight Cloudera requirements. Using this feature is a must in any replication flow that you set up.
  11. Enable heartbeating by deploying an instance of MirrorHeartbeatConnector.
    MirrorHeartbeatConnector is responsible for creating minimal replication traffic in the flow. Because of this, the Connector needs access to the source cluster, but configured as if it was the target cluster. This means that you need to provide the source cluster configurations with the producer.override. and target.cluster. prefixes.
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: mirror-heartbeat-connector
      namespace: [***REPLICATION NS***]
      labels:
        strimzi.io/cluster: [***CONNECT CLUSTER NAME***]
    spec:
      class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
      tasksMax: 1
      config:
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    
        #replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:replication.policy.class}
    
        # Cluster aliases
        source.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias}
        target.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias}
    
        # Source cluster configurations configured as target - sourced from configmap, secret and volume
        target.cluster.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers}
        target.cluster.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol}
        target.cluster.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism}
        target.cluster.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config}
        target.cluster.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location}
        target.cluster.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password}
    
        # Source cluster configurations configured as producer override - sourced from configmap, secret and volume
        producer.override.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers}
        producer.override.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol}
        producer.override.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism}
        producer.override.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config}
        producer.override.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location}
        producer.override.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password}
    • Uncomment the replication.policy.class property if you added this property to [***REPLICATION CONFIGMAP***]. This property configures what replication policy is used for replication.