Migrating Streaming Workloads to Cloudera Private CloudPDF version

Migrate Kafka Using the MigratingReplicationPolicy

How to migrate Kafka with Streams Replication Manager using the MigratingReplicationPolicy, which is a custom replication policy that you must implement, compile, and package yourself.

  • Ensure that you have reviewed Migrate Kafka Using Streams Replication Manager and understand the limitations and use cases for this policy.
  • Setup and Configure Streams Replication Manager in the Cloudera Private Cloud Base cluster for unidirectional replication.
    You can configure unidirectional replication by adding and enabling a single replication in the Streams Replication Manager's Replication Configs property. For example:
    For more information on setup and configuration, see Add and Configure Streams Replication Manager. Setup instructions might differ in different versions. Ensure that you view the version of the documentation that matches your Runtime version.
  1. Implement, compile, and package (JAR) the following custom replication policy that overrides Streams Replication Manager default behavior.
    package com.cloudera.dim.mirror;
    import java.util.Map;
    import org.apache.kafka.common.Configurable;
    import org.apache.kafka.connect.mirror.ReplicationPolicy;
    import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
    public class MigratingReplicationPolicy implements ReplicationPolicy, Configurable {
        private String sourceClusterAlias;
        public void configure(Map<String, ?> props) {
            // The source cluster alias cannot be determined just by looking at the prefix of the remote topic name.
            // We extract this info from the configuration.
            sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
        public String formatRemoteTopic(String sourceClusterAlias, String topic) {
            // We do not apply any prefix.
            return topic;
        public String topicSource(String topic) {
            // return from config
            return sourceClusterAlias;
        public String upstreamTopic(String topic) {
            return null;
  2. Modify the classpath of the Streams Replication Manager driver to include the compiled artifact when the Streams Replication Manager Driver is started.
    This step is done differently depending on the Cloudera Runtime version.
    1. Find the srm-driver script located at /opt/cloudera/parcels/CDH/lib/streams_replication_manager/bin/srm-driver.
    2. Modify the -cp flag in the srm-driver script to include the additional JAR file. For example:
      exec $JAVA $SRM_HEAP_OPTS $SRM_JVM_PERF_OPTS $SRM_KERBEROS_OPTS $GC_LOG_OPTS $SRM_JMX_OPTS -DdefaultConfig=$SRM_CONFIG_DIR/srm.properties -DdefaultYaml=$SRM_CONFIG_DIR/srm-service.yaml -cp [***PATH TO POLICY JAR***]:$SRM_LIB_DIR/srm-driver-$SRM_LIB_DIR/srm-common-
  3. Configure the Streams Replication Manager service to use the custom replication policy:
    1. In Cloudera Manager, select the Streams Replication Manager service.
    2. Go to Configuration.
    3. Find the Streams Replication Manager’s Replications Config property and add the following:

      Setting the replication.policy.class property configures Streams Replication Manager to use the custom replication policy instead of the default one.

    4. Click Save Changes.
    5. Restart the service.
  4. Use the srm-control tool to include every topic and every consumer group in the allowlist.
    Including consumer groups in the allowlist is required for checkpointing.
    srm-control topics --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
    srm-control groups --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
  5. Stop producers.
  6. Stop consumers.
  7. Reconfigure all consumers to read from Cloudera Private Cloud Base Kafka and apply offset translation using Streams Replication Manager.
  8. Start consumers.
  9. Reconfigure all producers to write to Cloudera Private Cloud Base Kafka.

    The HDF instances of Kafka and Streams Messaging Manager are no longer required.

  10. Start producers.

Kafka is migrated. Kafka clients produce and consume from the Cloudera Private Cloud Base cluster. Migration is complete.