Migrate Kafka Using the MigratingReplicationPolicy
How to migrate Kafka with Streams Replication Manager (SRM) using the
MigratingReplicationPolicy, which is a custom replication policy that you
must implement, compile, and package yourself.
Setup and Configure SRM in the CDP Private Cloud Base cluster for
unidirectional replication.
You can configure unidirectional replication by adding and
enabling a single replication in the Streams Replication Manager's
Replication Configs property. For
example:
HDF->CDP.enabled=true
For more
information on setup and configuration, see Add and Configure SRM. Setup instructions might
differ in different versions. Ensure that you view the version of the documentation that
matches your Runtime version.
Implement, compile, and package (JAR) the following custom replication policy that
overrides SRM’s default behavior.
package com.cloudera.dim.mirror;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
public class MigratingReplicationPolicy implements ReplicationPolicy, Configurable {
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
// The source cluster alias cannot be determined just by looking at the prefix of the remote topic name.
// We extract this info from the configuration.
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
// We do not apply any prefix.
return topic;
}
@Override
public String topicSource(String topic) {
// return from config
return sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
Modify the classpath of the SRM driver to include the compiled artifact when the SRM
Driver is started.
This step is done differently depending on the Cloudera Runtime version.
In Cloudera Manager, select the Streams Replication Manager service.
Go to Configuration.
Add the following key and value pair to both Streams Replication
Manager Service Environment Advanced Configuration Snippet (Safety
Valve) and SRM Driver Environment Advanced Configuration
Snippet (Safety
Valve).
Key:SRM_CLASSPATH
Value:[***PATH TO POLICY JAR***]
If you have other artifacts added to SRM_CLASSPATH,
ensure that each path is delimited with a colon (:).
Configure the SRM service to use the custom replication policy:
In Cloudera Manager, select the Streams Replication Manager
service.
Go to Configuration.
Find the Streams Replication Manager’s Replications Config
property and add the following: