Migrate Kafka Using a Custom Replication Policy

How to migrate Kafka with Streams Replication Manager using a custom replication policy.

The default replication policy used by Streams Replication Manager (SRM) renames remote topics on target clusters. It adds the name of the source cluster as a prefix to the topic names. If this behaviour is not viable, you can configure SRM to use a custom replication policy that retains the original names of the topics. This custom replication policy has to be implemented, compiled, and packaged (JAR) by you. The code is provided in this the document.
Review the following notes about the custom replication policy:
  • While this policy is in effect, SRM is unable to differentiate between local topics and replicated topics.

    This is because replicated topics do not have the source cluster prefix.

  • When this policy is used, the Streams Messaging Manager (SMM) UI incorrectly displays local topics as replicated in the Cluster Replications tab.

    This is because replicated topics do not have the source cluster prefix.

  • This replication policy is only supported with a unidirectional data replication setup where replication happens from a single source cluster to a single target cluster.

    Configuring additional hops or bi-directional replication is not supported and can lead to severe replication issues.

  • Using this replication policy in any other scenario than data migration is not supported.

    Once migration is complete, you need to reconfigure SRM to use the default replication policy or delete the service from the cluster.

The following steps describe how you can compile the custom replication policy and configure SRM to use it. In addition, it also describes how data can be migrated once configuration is complete.

Setup and Configure SRM in the CDP Private Cloud Base cluster for unidirectional replication. You can configure unidirectional replication by adding and enabling a single replication in the Streams Replication Manager's Replication Configs property. For example:
For more information on setup and configuration, see Add and Configure SRM in the SRM documentation for Runtime.
  1. Implement, compile, and package (JAR) the following custom replication policy that overrides SRM’s default behavior.
    package com.cloudera.dim.mirror;
    import java.util.Map;
    import org.apache.kafka.common.Configurable;
    import org.apache.kafka.connect.mirror.ReplicationPolicy;
    import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
    public class MigratingReplicationPolicy implements ReplicationPolicy, Configurable {
        private String sourceClusterAlias;
        public void configure(Map<String, ?> props) {
            // The source cluster alias cannot be determined just by looking at the prefix of the remote topic name.
            // We extract this info from the configuration.
            sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
        public String formatRemoteTopic(String sourceClusterAlias, String topic) {
            // We do not apply any prefix.
            return topic;
        public String topicSource(String topic) {
            // return from config
            return topic == null ? null : sourceClusterAlias;
        public String upstreamTopic(String topic) {
            return null;
  2. Modify the classpath of the SRM driver to include the compiled artifact when the SRM driver is started:
    1. Find the srm-driver script located at /opt/cloudera/parcels/CDH/lib/streams_replication_manager/bin/srm-driver.
    2. Modify the -cp flag in the srm-driver script to include the additional .jar. For example:
      exec $JAVA $SRM_HEAP_OPTS $SRM_JVM_PERF_OPTS $SRM_KERBEROS_OPTS $GC_LOG_OPTS $SRM_JMX_OPTS -DdefaultConfig=$SRM_CONFIG_DIR/srm.properties -DdefaultYaml=$SRM_CONFIG_DIR/srm-service.yaml -cp [PATH_TO_CUSTOM_POLICY_JAR]:$SRM_LIB_DIR/srm-driver-$SRM_LIB_DIR/srm-common-
  3. Configure the SRM service to use the custom replication policy:
    1. In Cloudera Manager, select the Streams Replication Manager service.
    2. Go to Configuration.
    3. FInd the Streams Replication Manager’s Replications Config property and add the following:

      Setting the replication.policy.class property configures SRM to use the custom replication policy instead of the default one.

    4. Click Save Changes.
    5. Restart the service.
  4. Use the srm-control tool to include every topic and every consumer group in the allowlist.
    Including consumer groups in the allowlist is required for offset translation.
    srm-control topics --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
    srm-control groups --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
  5. Validate that data is being migrated.
    Use the Cluster Replications page on the SMM UI to monitor and validate the status of the migration.
  6. Stop producers.
  7. Stop consumers.
  8. Reconfigure all consumers to read from CDP Private Cloud Base Kafka and apply offset translation using SRM.
  9. Start consumers.
  10. Reconfigure all producers to write to CDP Private Cloud Base Kafka.

    The HDF instances of Kafka and SMM are no longer required.

  11. Start producers.

Kafka is migrated. Kafka clients produce and consume from the CDP Private Cloud Base cluster. Migration is complete.