Enabling exactly-once semantics for replication flows

You can enable exactly-once semantics (EOS) for replication flows in Streams Replication Manager to ensure that transactions are replicated without duplication.

The progress of MirrorSourceConnector is tracked by periodically committing the offsets of the processed messages. If the connector fails, uncommitted messages are reprocessed after the connector starts running again.

Using EOS, source connectors are able to handle offset commits and message produces in a single transaction. This either results in a successful operation where messages are produced to the target topic along with offset commits, or a rollback of the whole operation. EOS is enabled in Streams Replication Manager with the exactly.once.source.support property.

If transactional producers are writing messages to the source topic, Cloudera recommends that you filter records from the aborted transactions out from the replicated data. Otherwise, aborted transactions are marked as committed in the target, which results in invalid data. This is configured in your MirrorSourceConnector with the isolation.level property. You set the property to read_committed.
  1. Enable EOS in Cloudera Manager.
    1. In Cloudera Manager, select the SRM service and go to Configuration.
    2. Find the Streams Replication Manager's Replication Configs property.
    3. Add the exactly.once.source.support property.

      Configuration differs for newly deployed resources and existing resources.

      • For new deployments, set the value to enabled.
      • For existing deployments, you must follow a two-step process to avoid downtime:
        1. Set the value to preparing.
        2. Click Save Changes.
        3. Restart the Streams Replication Manager service.
        4. Wait for the replication to stabilize.
        5. Update the value to enabled.
  2. Set the consumer isolation level.
    1. In the same Streams Replication Manager's Replication Configs property, configure the isolation level.
    2. To set it globally for all connectors, add:
      connectors.consumer.isolation.level=read_committed
    3. Alternatively, to set it for a specific replication flow, add:
      [SOURCE]->[TARGET].consumer.isolation.level=read_committed

      Replace [SOURCE] and [TARGET] with the cluster aliases.

  3. Click Save Changes.
  4. Restart the Streams Replication Manager service.