Deploying a replication flow
You deploy a replication flow between two clusters by deploying a Kafka Connect cluster and an instance of each replication connector (MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector). Additionally, you create various ConfigMaps and Secrets that store configuration required for replication.
The following steps walk you through how you can create a replication flow between two secured Kafka clusters. Both Kafka and Kafka Connect are deployed in Kubernetes.
The Kafka Connect cluster that you set up must be a new cluster and must be dedicated to the replication flow. Reusing an existing cluster that is running other connectors or using the same cluster for multiple replication flows is not recommended.
Replication of Kafka data as well as other replication-related tasks are carried out by the replication connectors. These are the MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector.
Deploying an instance of the MirrorSourceConnector and MirrorChekpointConnector are mandatory. Deploying MirrorHeartbeatConnector is optional.
The connectors load their connection-related configuration from various Secrets, ConfigMaps, as well as property files.
This example uses the DefaultReplicationpolicy, but provides instructions on what connector properties you need to add if you want to use the IdentityReplicationPolicy (prefixless replication).
These steps assume that the two Kafka clusters have TLS encryption and PLAIN authentication enabled. Replication can be configured for any other type of security as well, but you will need to change the appropriate security configurations.
- Strimzi is installed. The Strimzi Cluster Operator is running. See installation Installation.
- You have identified the two Kafka clusters that you want to replicate data
between.
The clusters can be any type of Kafka cluster running on any platform. These steps assume that both Kafka instances are running in Kubernetes and were deployed with Cloudera Streams Messaging - Kubernetes Operator.
- Resource examples in these steps use various features and configurations available in Kafka Connect. Familiarity with the following is recommended.
-
Collect the following for both source and target Kafka clusters.
- Bootstrap servers
- TLS truststore/crt
- TLS truststore password
- PLAIN credentials
The configurations you collect here will be specified in the Secrets and ConfigMaps and the KafkaConnect resource that you create in the following steps.
-
Create a namespace.
You deploy all resources required for the replication flow in this namespace.kubectl create namespace [***REPLICATION NS***]
-
Create a Secret containing credentials for the Docker
registry where Cloudera Streams Messaging - Kubernetes Operator artifacts are
hosted.
kubectl create secret docker-registry [***SECRET NAME***] \ --docker-server [***REGISTRY***] \ --docker-username [***USERNAME***] \ --docker-password [***PASSWORD***] \ --namespace [***REPLICATION NS***]
- [***SECRET NAME***] must be the same as the name of the Secret containing registry credentials that you created during Strimzi installation.
- Replace [***REGISTRY***] with the server location of the Docker registry
where Cloudera Streams Messaging - Kubernetes Operator artifacts are hosted. If your Kubernetes cluster
has internet access, use
container.repository.cloudera.com
. Otherwise, enter the server location of your self-hosted registry. - Replace [***USERNAME***] and [***PASSWORD***] with
credentials that provide access to the registry. If you are using
container.repository.cloudera.com
, use your Cloudera credentials. Otherwise, enter credentials providing access to your self-hosted registry.
-
Create a ConfigMap and two Secrets for
the target Kafka cluster.
These resources store configuration that provides access to the target Kafka cluster.
-
Create a ConfigMap that contains the non-sensitive
configuration properties of the target Kafka cluster.
kubectl create configmap [***TARGET CONFIGMAP***] \ --from-literal=alias=[***TARGET CLUSTER ALIAS***] \ --namespace [***REPLICATION NS***]
This ConfigMap does not need to include connection related properties like the bootstrap server. These connection properties will be sourced from the Kafka Connect worker’s (cluster) property file. Sourcing them from the workers' property file is possible because Kafka Connect will depend on the target Kafka cluster. You can use this ConfigMap to store other reusable properties. -
Create a Secret containing the PLAIN password to use
when connecting to the target Kafka cluster.
kubectl create secret generic [***TARGET PASSWORD SECRET***] \ --from-literal=pass=[***PASSWORD***] \ --namespace [***REPLICATION NS***]
-
Create a Secret that contains the TLS Certificate
Authority (CA) certificate of the target Kafka cluster.
kubectl create secret generic [***TARGET CERT SECRET***] \ --from-file=ca.crt=[***PATH TO CA CERT***] \ --namespace [***REPLICATION NS***]
-
Create a ConfigMap that contains the non-sensitive
configuration properties of the target Kafka cluster.
-
Create a ConfigMap and a Secret for the
source Kafka cluster.
These resources store configuration that provide access to the source Kafka cluster.
-
Create a Secret that contains the truststore file,
the truststore password, and JAAS configuration of the source Kafka
cluster.
kubectl create secret generic [***SOURCE SECRET***] \ --from-literal=ssl.truststore.password=[***TRUSTSTORE PASSWORD***] \ --from-file=truststore.jks=[***TRUSTSTORE FILE***] \ --from-literal=sasl.jaas.config='org.apache.kafka.common.security.plain.PlainLoginModule required username="[***USERNAME***]" password="[***PASSWORD***]";' \ --namespace [***REPLICATION NS***]
-
Create a ConfigMap that contains non-sensitive
configuration properties of the source Kafka cluster.
This ConfigMap will contain the cluster alias, connection properties, and any other reusable properties.
kubectl create configmap [***SOURCE CONFIGMAP***] \ --from-literal=alias=[***SOURCE CLUSTER ALIAS***] \ --from-literal=bootstrap.servers=[***SOURCE KAFKA BOOTSTRAP***].[***SOURCE KAFKA NAMESPACE***]:[***PORT***] \ --from-literal=security.protocol=SASL_SSL \ --from-literal=sasl.mechanism=PLAIN \ --from-literal=ssl.truststore.location=/mnt/[***VOLUME NAME***]/truststore.jks \ --namespace [***REPLICATION NS***]
You will attach the truststore as a volume in a later step. Note down the value you specify for [***VOLUME NAME***]. You will need to provide it in the KafkaConnect resource.
-
Create a Secret that contains the truststore file,
the truststore password, and JAAS configuration of the source Kafka
cluster.
-
Create a ConfigMap that stores configuration related to
replication.
This ConfigMap will store configuration that is shared by the connectors that you will deploy. This map is created to single source configuration that is common across the connectors.
This example creates a ConfigMap that defines a single property,
topics
, which specifies what topics should be replicated. In this example, alltest.*
topics are added for replication.kubectl create configmap [***REPLICATION CONFIGMAP***] \ --from-literal=topics="test.*" \ --namespace [***REPLICATION NS***]
This ConfigMap is referred to in the following steps as [***REPLICATION CONFIGMAP***].
-
Deploy a Kafka Connect cluster.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: [***CONNECT CLUSTER NAME***] namespace: [***REPLICATION NS***] annotations: strimzi.io/use-connector-resources: "true" spec: version: 3.8.0.1.2 replicas: 3 bootstrapServers: [***TARGET KAFKA BOOTSTRAP***].[***TARGET KAFKA NAMESPACE***]:[***PORT***] tls: trustedCertificates: - secretName: [***TARGET CERT SECRET***] certificate: ca.crt authentication: type: plain username: [***USERNAME***] passwordSecret: secretName: [***TARGET PASSWORD SECRET***] password: pass template: pod: volumes: - name: [***VOLUME NAME***] secret: secretName: [***SOURCE SECRET***] items: - truststore.jks connectContainer: volumeMounts: - name: [***VOLUME NAME***] mountPath: /mnt/[***VOLUME NAME***] config: group.id: [***CONNECT CLUSTER NAME***]-consumer-group offset.storage.topic: [***CONNECT CLUSTER NAME***]-offsets-topic config.storage.topic: [***CONNECT CLUSTER NAME***]-config-topic status.storage.topic: [***CONNECT CLUSTER NAME***]-status-topic config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 config.providers: cfmap,secret,file config.providers.cfmap.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider config.providers.secret.class: io.strimzi.kafka.KubernetesSecretConfigProvider config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
Notice the following about this resource configuration.
- The names specified in
metadata.name
,group.id
, and*storage.topic
follow a consistent naming convention.Cloudera recommends adding cluster aliases to these names as well as using prefixes and postfixes. For example, your cluster name can be
repl-uswest-useast
. Whererepl
is a prefix,useast
anduswest
are the aliases. The group ID can berepl-uswest-useast-consumer-group
, whererepl-uswest-useast
is the name of the cluster,-consumer-group
is a postfix.The prefixes and postfixes like
repl
,-consumer-group
,-offsets-topic
,-config-topic
,-status-topic
are merely suggestions. bootstrapServers
is set to the target Kafka cluster’s bootstrap.That is, this Kafka Connect cluster will depend on the target Kafka cluster. This is a must have for correct replication architecture. The .[***TARGET KAFKA NAMESPACE***] postfix is only required because this example assumes that the Kafka cluster is running in Kubernetes.
trustedCertificates
references a Secret you created in a previous step, which contains the CA certificate of the target cluster.template.pod.volumes
defines volume which contain the truststore file from a Secret you created in a previous step.template.connectContainer.volumeMounts
mount s the volume which contains the truststore file.*.storage.replication.factor
properties are set to-1
.This means that these internal topics are created with the default replication factor configured in the Kafka cluster that this Kafka Connect cluster depends on (the target Kafka cluster).
- The
config.providers.*
properties enable various configuration providers.These are necessary as the connectors you set up in a later step load configuration from various external resources using these configuration providers.
- The names specified in
-
Create a Role and RoleBinding.
The KubernetesConfigMapConfigProvider and KubernetesSecretConfigProvider configuration providers specified in the KafkaConnect resource in the previous step, require additional access rights to access the ConfigMaps and Secrets, respectively. Creating the below Role and RoleBinding is required to grant them these privileges.
apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: connector-configuration-role namespace: [***REPLICATION NS***] rules: - apiGroups: [""] resources: ["secrets"] resourceNames: ["[***SOURCE SECRET***]"] verbs: ["get"] - apiGroups: [""] resources: ["configmaps"] resourceNames: ["[***SOURCE CONFIGMAP***]", "[***TARGET CONFIGMAP***]", "[****REPLICATION CONFIGMAP**]"] verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: connector-configuration-role-binding namespace: [***REPLICATION NS***] subjects: - kind: ServiceAccount name: [***CONNECT CLUSTER NAME***]-connect roleRef: kind: Role name: connector-configuration-role apiGroup: rbac.authorization.k8s.io
- The resource names you specify in
rules.apiGroups.resourceNames
are the names of the ConfigMap and Secret resources you created for the source and target Kafka clusters in a previous step. - The ServiceAccount name is fixed and follows a
pattern.
The name is the Kafka Connect cluster name postfixed with
-connect
. This name is fixed because the ServiceAccount is generated and named by the Strimzi Cluster Operator. That is, the-connect
postfix, is not user defined, ensure that you do not change it.
- The resource names you specify in
-
Enable data replication by deploying an instance of
MirrorSourceConnector.
MirrorSourceConnector requires access to both the source and target Kafka clusters. Therefore, it requires access to all configurations you set up in previous steps. Additionally, some extra configuration is required.
Configuration required to connect to the target cluster is sourced from the Kafka Connect worker’s property file.
Configuration required to connect to the source cluster is sourced from the ConfigMap, Secret, and truststore volume you set up for the source cluster in a previous step.
Other configurations such as the target cluster alias is sourced from the ConfigMap you set up for the target cluster in a previous step.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: mirror-source-connector namespace: [***REPLICATION NS***] labels: strimzi.io/cluster: [***CONNECT CLUSTER NAME***] spec: class: org.apache.kafka.connect.mirror.MirrorSourceConnector tasksMax: 3 config: key.converter: org.apache.kafka.connect.converters.ByteArrayConverter value.converter: org.apache.kafka.connect.converters.ByteArrayConverter refresh.topics.interval.seconds: 10 topics: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:topics} #replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:replication.policy.class} # Source cluster configurations - sourced from configmap, secret and volume source.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias} source.cluster.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers} source.cluster.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol} source.cluster.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism} source.cluster.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config} source.cluster.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location} source.cluster.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password} # Target cluster configurations - mostly sourced from the Connect worker config target.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias} target.cluster.bootstrap.servers: ${file:/tmp/strimzi-connect.properties:bootstrap.servers} target.cluster.security.protocol: ${file:/tmp/strimzi-connect.properties:security.protocol} target.cluster.sasl.mechanism: ${file:/tmp/strimzi-connect.properties:sasl.mechanism} target.cluster.sasl.jaas.config: ${file:/tmp/strimzi-connect.properties:sasl.jaas.config} target.cluster.ssl.truststore.location: ${file:/tmp/strimzi-connect.properties:ssl.truststore.location} target.cluster.ssl.truststore.password: ${file:/tmp/strimzi-connect.properties:ssl.truststore.password}
- Uncomment the
replication.policy.class
property if you added this property to [***REPLICATION CONFIGMAP***]. This property configures what replication policy is used for replication.
- Uncomment the
-
Enable consumer group offset synchronization by deploying an instance of
MirrorCheckpointConnector.
MirrorCheckpointConnector requires access to both the source and target clusters. Additionally, it requires the same replication policy configuration, topic filters, and offset synchronization configurations as used by MirrorSourceConnector.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: mirror-checkpoint-connector namespace: [***REPLICATION NS***] labels: strimzi.io/cluster: [***CONNECT CLUSTER NAME***] spec: class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector tasksMax: 3 config: key.converter: org.apache.kafka.connect.converters.ByteArrayConverter value.converter: org.apache.kafka.connect.converters.ByteArrayConverter refresh.groups.interval.seconds: 10 sync.group.offsets.enabled: true topics: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:topics} groups: test.* #replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:replication.policy.class} # Source cluster configurations - sourced from configmap, secret and volume source.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias} source.cluster.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers} source.cluster.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol} source.cluster.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism} source.cluster.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config} source.cluster.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location} source.cluster.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password} # Target cluster configurations - mostly sourced from the Connect worker config target.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias} target.cluster.bootstrap.servers: ${file:/tmp/strimzi-connect.properties:bootstrap.servers} target.cluster.security.protocol: ${file:/tmp/strimzi-connect.properties:security.protocol} target.cluster.sasl.mechanism: ${file:/tmp/strimzi-connect.properties:sasl.mechanism} target.cluster.sasl.jaas.config: ${file:/tmp/strimzi-connect.properties:sasl.jaas.config} target.cluster.ssl.truststore.location: ${file:/tmp/strimzi-connect.properties:ssl.truststore.location} target.cluster.ssl.truststore.password: ${file:/tmp/strimzi-connect.properties:ssl.truststore.password}
- Uncomment the
replication.policy.class
property if you added this property to [***REPLICATION CONFIGMAP***]. This property configures what replication policy is used for replication. - The
sync.group.offsets.enabled
property istrue
by default. As a result, setting this property explicitly totrue
is not necessary. The property is explicitly set to true in this example to highlight Cloudera requirements. Using this feature is a must in any replication flow that you set up.
- Uncomment the
-
Enable heartbeating by deploying an instance of
MirrorHeartbeatConnector.
MirrorHeartbeatConnector is responsible for creating minimal replication traffic in the flow. Because of this, the Connector needs access to the source cluster, but configured as if it was the target cluster. This means that you need to provide the source cluster configurations with the
producer.override.
andtarget.cluster.
prefixes.apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: mirror-heartbeat-connector namespace: [***REPLICATION NS***] labels: strimzi.io/cluster: [***CONNECT CLUSTER NAME***] spec: class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.converters.ByteArrayConverter value.converter: org.apache.kafka.connect.converters.ByteArrayConverter #replication.policy.class: ${cfmap:[***REPLICATION NS***]/[***REPLICATION CONFIGMAP***]:replication.policy.class} # Cluster aliases source.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:alias} target.cluster.alias: ${cfmap:[***REPLICATION NS***]/[***TARGET CONFIGMAP***]:alias} # Source cluster configurations configured as target - sourced from configmap, secret and volume target.cluster.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers} target.cluster.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol} target.cluster.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism} target.cluster.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config} target.cluster.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location} target.cluster.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password} # Source cluster configurations configured as producer override - sourced from configmap, secret and volume producer.override.bootstrap.servers: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:bootstrap.servers} producer.override.security.protocol: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:security.protocol} producer.override.sasl.mechanism: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:sasl.mechanism} producer.override.sasl.jaas.config: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:sasl.jaas.config} producer.override.ssl.truststore.location: ${cfmap:[***REPLICATION NS***]/[***SOURCE CONFIGMAP***]:ssl.truststore.location} producer.override.ssl.truststore.password: ${secret:[***REPLICATION NS***]/[***SOURCE SECRET***]:ssl.truststore.password}
- Uncomment the
replication.policy.class
property if you added this property to [***REPLICATION CONFIGMAP***]. This property configures what replication policy is used for replication.
- Uncomment the