How to migrate Kafka with Streams Replication Manager using a custom replication
policy.
The default replication policy used by Streams Replication Manager (SRM) renames
remote topics on target clusters. It adds the name of the source cluster as a prefix to the
topic names. If this behaviour is not viable, you can configure SRM to use a custom
replication policy that retains the original names of the topics. This custom replication
policy has to be implemented, compiled, and packaged (JAR) by you. The code is provided in
this the document.
Review the following notes about the custom replication policy:
While this policy is in effect, SRM is unable to differentiate between local topics
and replicated topics.
This is because replicated topics do not have the source
cluster prefix.
When this policy is used, the Streams Messaging Manager (SMM) UI incorrectly displays
local topics as replicated in the Cluster Replications
tab.
This is because replicated topics do not have the source cluster
prefix.
This replication policy is only supported with a unidirectional data replication setup
where replication happens from a single source cluster to a single target cluster.
Configuring additional hops or bi-directional replication is not supported and can
lead to severe replication issues.
Using this replication policy in any other scenario than data migration is not
supported.
Once migration is complete, you need to reconfigure SRM to use the default
replication policy or delete the service from the cluster.
The following steps describe how you can compile the custom replication policy
and configure SRM to use it. In addition, it also describes how data can be migrated once
configuration is complete.
Setup and Configure SRM in the CDP Private Cloud Base cluster
for unidirectional replication. You can configure unidirectional replication by adding and
enabling a single replication in the Streams Replication Manager's Replication
Configs property. For example:
HDF->CDP.enabled=true
For
more information on setup and configuration, see Add and Configure SRM in the SRM
documentation for Runtime.
Implement, compile, and package (JAR) the following custom replication policy that
overrides SRM’s default behavior.
package com.cloudera.dim.mirror;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
public class MigratingReplicationPolicy implements ReplicationPolicy, Configurable {
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
// The source cluster alias cannot be determined just by looking at the prefix of the remote topic name.
// We extract this info from the configuration.
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
// We do not apply any prefix.
return topic;
}
@Override
public String topicSource(String topic) {
// return from config
return topic == null ? null : sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
Modify the classpath of the SRM driver to include the compiled artifact when the
SRM driver is started:
Find the srm-driver script located at
/opt/cloudera/parcels/CDH/lib/streams_replication_manager/bin/srm-driver.
Modify the -cp flag in the srm-driver
script to include the additional .jar. For example: