How to migrate Kafka with Streams Replication Manager using a custom replication
policy.
The default replication policy used by Streams Replication Manager (SRM) renames
remote topics on target clusters. It adds the name of the source cluster as a prefix to the
topic names. If this behaviour is not viable, you can configure SRM to use a custom
replication policy that retains the original names of the topics. Review the following
notes about the custom replication policy:
- While this policy is in effect, SRM is unable to differentiate between local topics
and replicated topics.
This is because replicated topics do not have the source
cluster prefix.
- When this policy is used, the Streams Messaging Manager (SMM) UI incorrectly displays
local topics as replicated in the Cluster Replications
tab.
This is because replicated topics do not have the source cluster
prefix.
- This replication policy is only supported with a unidirectional data replication setup
where replication happens from a single source cluster to a single target cluster.
Configuring additional hops or bi-directional replication is not supported and can
lead to severe replication issues.
- Using this replication policy in any other scenario than data migration is not
supported.
Once migration is complete, you need to reconfigure SRM to use the default
replication policy or delete the service from the cluster.
The following steps describe how you can configure SRM to use the custom
replication policy and how data can be migrated once configuration is complete.
Setup and Configure SRM in the CDP Private Cloud Base cluster for unidirectional replication. You
can configure unidirectional replication by adding and enabling a single replication in the
Streams Replication Manager's Replication Configs property. For
example:HDF->CDP.enabled=true
For more information on setup and
configuration, see Add and Configure SRM in the SRM documentation for Runtime.
- Implement, compile, and package (JAR) the following custom replication policy that
overrides SRM’s default behavior.
package com.cloudera.dim.mirror;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
public class MigratingReplicationPolicy implements ReplicationPolicy, Configurable {
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
// The source cluster alias cannot be determined just by looking at the prefix of the remote topic name.
// We extract this info from the configuration.
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
// We do not apply any prefix.
return topic;
}
@Override
public String topicSource(String topic) {
// return from config
return topic == null ? null : sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
- Modify the classpath of the SRM driver to include the compiled artifact when the
SRM driver is started:
- Find the
srm-driver
script located at
/opt/cloudera/parcels/CDH/lib/streams_replication_manager/bin/srm-driver
.
- Modify the
-cp
flag in the srm-driver
script to include the additional .jar. For example:
exec $JAVA $SRM_HEAP_OPTS $SRM_JVM_PERF_OPTS $SRM_KERBEROS_OPTS $GC_LOG_OPTS $SRM_JMX_OPTS -DdefaultConfig=$SRM_CONFIG_DIR/srm.properties -DdefaultYaml=$SRM_CONFIG_DIR/srm-service.yaml -cp [PATH_TO_CUSTOM_POLICY_JAR]:$SRM_LIB_DIR/srm-driver-1.0.0.7.1.1.0-567.jar:$SRM_LIB_DIR/srm-common-1.0.0.7.1.1.0-567.jar:...
- Configure the SRM service to use the custom replication policy:
- In Cloudera Manager, select the Streams Replication Manager
service.
- Go to Configuration.
- FInd the Streams Replication Manager’s Replications
Config property and add the following:
replication.policy.class=com.cloudera.dim.mirror.MigratingReplicationPolicy
source.cluster.alias=[SOURCE_CLUSTER_ALIAS]
Replace [SOURCE_CLUSTER_ALIAS]
with the alias of the source
cluster where topics are being migrated from.
Setting the replication.policy.class
property configures SRM to
use the custom replication policy instead of the default one.
Setting source.cluster.alias
is required in order for the SRM
service to correctly identify the source cluster from the prefixless topic names on
the target cluster.
- Click Save Changes.
- Restart the service.
- Use the
srm-control
tool to whitelist every topic and every
consumer group. Whitelisting consumer groups is required for offset
translation.
srm-control topics --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
srm-control groups --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
- Validate that data is being migrated.
Use the Cluster
Replications page on the SMM UI to monitor and validate the status of the
migration.
- Stop producers.
- Stop consumers.
- Reconfigure all consumers to read from CDP Private Cloud Base Kafka and apply offset
translation using SRM.
- Start consumers.
- Reconfigure all producers to write to CDP Private Cloud Base Kafka.
The HDF instances of Kafka and SMM are no longer required.
- Start producers.
Kafka is migrated. Kafka clients produce and consume from the CDP Private Cloud Base cluster.
Migration is complete.