Configuring data replication offsets

Learn how you can configure and modify what offset the MirrorSourceConnector replicates form.

By default, MirrorSourceConnector replicates data from the start of the source topics, and keeps track of the progress by committing source offsets into the Kafka Connect framework.

This behavior can be modified in the following ways.

  • Starting data replication from the latest offset for new partitions.
  • Manually setting exact offsets for specific source partitions.

Replicating from the latest offset for new partitions

To replicate data from the latest offset, you configure auto.offset.reset property for the source consumer in the MirrorSourceConnector.

#... 
kind: KafkaConnector
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  config:
    source.consumer.auto.offset.reset: latest

With this configuration, all new partitions (without a committed offset) are replicated from the latest offset. Cloudera recommends applying this configuration under special circumstances only as it violates the at-least-once guarantee of data replication.

This example uses the source.consumer. prefix. That is, auto.offset.reset is specifically set for the source consumer in the connector, which is the consumer connecting to the source cluster.

Manually setting exact offsets for specific source partitions

In some situations, it might be necessary to rewind the replication and reprocess records, or fast forward and skip some records. To do this, you can manipulate the exact offsets per partition and change the state of the replication.

  1. Stop the MirrorSourceConnector.
    To do this, set the spec.state property to stopped in the KafkaConnector resource of the connector.
    #...
    kind: KafkaConnector
    spec:
      class:org.apache.kafka.connect.mirror.MirrorSourceConnector
      state: stopped
    
  2. Use connect_shell.sh to get administrative access to the Connect REST API.
    connect_shell.sh --namespace=[***CONNECT CLUSTER NAMESPACE***] \
      --cluster=[***CONNECT CLUSTER NAME***]
  3. Create a payload to manipulate the source offsets with the offset management endpoints of the Kafka Connect REST API.
    The payload is connector specific. For example, the structure for the MirrorSourceConnector is the following.
    {"offsets":[{"partition":{"cluster":"[***SOURCE CLUSTER ALIAS***]","partition":0,"topic":"[***SOURCE TOPIC NAME***]"},"offset":{"offset":[***OFFSET***]}}]}
    
    You can specify multiple partitions in the structure. Additionally, you can set offsets.offset to null to delete the offset for a specific partition. Alternatively, you can also delete all offsets with the DELETE /connectors/{connector}/offsets endpoint.
    curl -X DELETE $CONNECT_REST_URL/connectors/[***CONNECTOR NAME***]/offsets
  4. Submit the payload.
    curl --data 'PAYLOAD' -H "Content-Type: application/json" -X PATCH $CONNECT_REST_URL/connectors/[***CONNECTOR NAME***]/offsets
    
  5. Resume the MirrorSourceConnector.
    To do this, set the spec.state property to running in the KafkaConnector resource of the connector.
    #...
    kind: KafkaConnector
    spec:
      class:org.apache.kafka.connect.mirror.MirrorSourceConnector
      state: running