Configuring data replication offsets

Learn how you can configure and modify what offset the MirrorSourceConnector replicates form.

By default, MirrorSourceConnector replicates data from the start of the source topics, and keeps track of the progress by committing source offsets into the Kafka Connect framework.

This behavior can be modified in the following ways.

  • Starting data replication from the latest offset for new partitions.
  • Manually setting exact offsets for specific source partitions.

Replicating from the latest offset for new partitions

To replicate data from the latest offset, you configure auto.offset.reset property for the source consumer in the MirrorSourceConnector.

#... 
kind: KafkaConnector
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  config:
    source.consumer.auto.offset.reset: latest

With this configuration, all new partitions (without a committed offset) are replicated from the latest offset. Cloudera recommends applying this configuration under special circumstances only as it violates the at-least-once guarantee of data replication.

This example uses the source.consumer. prefix. That is, auto.offset.reset is specifically set for the source consumer in the connector, which is the consumer connecting to the source cluster.

Manually setting exact offsets for specific source partitions

In some situations, it might be necessary to rewind the replication and reprocess records, or fast forward and skip some records. To do this, you can manipulate the exact offsets per partition and change the state of the replication.

  1. List the current offsets of the MirrorSourceConnector.
    1. Configure your KafkaConnector resource to include the spec.listOffsets property.
      #...
      kind: KafkaConnector
      spec:
        class: org.apache.kafka.connect.mirror.MirrorSourceConnector
        listOffsets:
          toConfigMap:
            name: [***CONFIGMAP NAME***]
      
      If the ConfigMap you specify does not exist, the Strimzi Cluster Operator creates it when you list connector offsets using the strimzi.io/connector-offsets="list" annotation.
    2. List connector offsets by annotating your KafkaConnector resource with strimzi.io/connector-offsets="list".
      kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
        --namespace [***NAMESPACE***] \
        strimzi.io/connector-offsets="list"
      Once the annotation is applied, the connector offsets are written to the ConfigMap specified in the spec.listOffsets property of the KafkaConnector resource. You will add your changes to this ConfigMap. The Strimzi Cluster Operator automatically removes the annotation once offsets are written.
  2. Edit the ConfigMap containing offsets.
    The payload is connector specific. For example, the structure for the MirrorSourceConnector is the following.
    {"offsets":[{"partition":{"cluster":"[***SOURCE CLUSTER ALIAS***]","partition":0,"topic":"[***SOURCE TOPIC NAME***]"},"offset":{"offset":[***OFFSET***]}}]}
    

    You can specify multiple partitions in the structure. Additionally, you can set offsets.offset to null to delete the offset for a specific partition.

  3. Configure your KafkaConnector resource to include the spec.alterOffsets property. In addition, stop the connector by setting spec.state to stopped.
    #...
    kind: KafkaConnector
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      state: stopped
      alterOffsets:
        fromConfigMap:
          name: [***CONFIGMAP NAME***]
  4. Alter connector offsets by annotating your KafkaConnector resource with strimzi.io/connector-offsets="alter".
    kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
      --namespace [***NAMESPACE***] \
      strimzi.io/connector-offsets="alter"
    The annotation is automatically removed by the Strimzi Cluster Operator after connector offsets are successfully updated.
  5. Resume the MirrorSourceConnector.
    To do this, set the spec.state property to running in the KafkaConnector resource of the connector.
    #...
    kind: KafkaConnector
    spec:
      class:org.apache.kafka.connect.mirror.MirrorSourceConnector
      state: running