Enabling exactly-once semantics for replication flows

You enable exactly once semantics (EOS) for replication flows by configuring EOS in the KafkaConnect resource. Optionally, Cloudera recommends that you set the source consumer isolation level in your MirrorSourceConnector to read_committed.

The progress of MirrorSourceConnector is tracked by periodically committing the offsets of the processed messages. If the connector fails, uncommitted messages are reprocessed after the connector starts running again.

Using EOS, source connectors are able to handle offset commits and message produces in a single transaction. This either results in a successful operation where messages are produced to the target topic along with offset commits, or a rollback of the whole operation. EOS is enabled in the KafkaConnect resource with the exactly.once.source.support property.

If transactional producers are writing messages to the source topic, Cloudera recommends that you filter records from the aborted transactions out from the replicated data. Otherwise, aborted transactions are marked as committed in the target, which results in invalid data. This is configured in your MirrorSourceConnector with the isolation.level property. You set the property to read_committed.
  1. Enable EOS in your KafkaConnect resource.

    Configuration differs for newly deployed resources and existing resources.

    Set exactly.once.source.support to enabled.
    #...
    kind: KafkaConnect
    spec:
      config:
        exactly.once.source.support: enabled
    1. Set exactly.once.source.support to preparing.
      #...
      kind: KafkaConnect
      spec:
        config:
          exactly.once.source.support: preparing
    2. Wait until configuration changes are applied and worker pod rolling restart finishes. The restart begins in the next reconciliation loop.
      kubectl get pods --namespace [***NAMESPACE***] --watch
    3. Set exactly.once.source.support to enabled.
  2. Set isolation.level in your MirrorSourceConnector.
    #...
    kind: KafkaConector
    spec:
      class: org.apache.kafka.connect.mirror.MirrorSourceConnector
      config:
        source.consumer.isolation.level: read_committed
    
    This example uses the source.consumer. prefix. That is, isolation.level is specifically set for the source consumer in the connector, which is the consumer connecting to the source cluster.