Migrate Kafka Using the MigratingReplicationPolicy
How to migrate Kafka with Streams Replication Manager using the
MigratingReplicationPolicy, which is a custom replication policy that you
must implement, compile, and package yourself.
Setup and Configure Streams Replication Manager in the Cloudera Private Cloud
Base cluster for unidirectional replication.
You can configure unidirectional
replication by adding and enabling a single replication in the Streams
Replication Manager's Replication Configs property. For
example:
HDF->CDP.enabled=true
For more
information on setup and configuration, see Add and Configure Streams Replication Manager. Setup instructions might differ in different versions. Ensure that you view
the version of the documentation that matches your Runtime version.
Implement, compile, and package (JAR) the following custom replication policy that
overrides Streams Replication Manager default behavior.
package com.cloudera.dim.mirror;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
public class MigratingReplicationPolicy implements ReplicationPolicy, Configurable {
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
// The source cluster alias cannot be determined just by looking at the prefix of the remote topic name.
// We extract this info from the configuration.
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
// We do not apply any prefix.
return topic;
}
@Override
public String topicSource(String topic) {
// return from config
return sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
Modify the classpath of the Streams Replication Manager driver to include the
compiled artifact when the Streams Replication Manager Driver is started.
This step is done differently depending on the Cloudera Runtime version.
In Cloudera Manager, select the Streams Replication Manager service.
Go to Configuration.
Add the following key and value pair to both Streams Replication
Manager Service Environment Advanced Configuration Snippet (Safety
Valve) and SRM Driver Environment Advanced Configuration
Snippet (Safety
Valve).
Key:SRM_CLASSPATH
Value:[***PATH TO POLICY JAR***]
If you have other artifacts added to SRM_CLASSPATH,
ensure that each path is delimited with a colon (:).
Configure the Streams Replication Manager service to use the custom replication
policy:
In Cloudera Manager, select the Streams Replication Manager service.
Go to Configuration.
Find the Streams Replication Manager’s Replications Config
property and add the following: