Exactly-once semantics

Exactly-once semantics (EOS) is a feature that enables Kafka and Kafka applications to guarantee that each message is delivered precisely once without it being duplicated or lost. EOS can be enabled for Kafka Connect and Kafka Connect source connectors.

Source connectors progress is tracked by periodically committing the offsets of the processed messages. If the connector fails, uncommitted messages are reprocessed after the connector starts running again.

Using EOS, source connectors are able to handle offset commits and message produces in a single transaction. This either results in a successful operation where messages are produced to the target topic along with offset commits, or a rollback of the whole operation. EOS is enabled in the KafkaConnect resource. Additionally you can fine-tune EOS related properties in the configuration of connector instances.

Enabling exactly-once semantics

You enable EOS for source connectors by configuring exactly.once.source.support in the KafkaConnect resource.

Configuration differs for newly deployed resources and existing resources.
Set exactly.once.source.support to enabled.
#...
kind: KafkaConnect
spec:
  config:
    exactly.once.source.support: enabled
  1. Set exactly.once.source.support to preparing.
    #...
    kind: KafkaConnect
    spec:
      config:
        exactly.once.source.support: preparing
  2. Wait until configuration changes are applied. This happens in the next reconciliation loop.
  3. Set exactly.once.source.support to enabled.

Disabling exactly-once semantics

You disable EOS for source connectors by configuring exactly.once.source.support in the KafkaConnect resource.

  1. Set exactly.once.source.support to preparing.
    #...
    kind: KafkaConnect
    spec:
      config:
        exactly.once.source.support: preparing
  2. Wait until configuration changes are applied.
    This happens in the next reconciliation loop.
  3. Set exactly.once.source.support to disabled.

Source connector properties for exactly-once semantics

After enabling EOS for source connectors in the KafkaConnect resource, you can fine-tune EOS by configuring your connector instances (KafkaConnector resources).

Use the following source connector properties to configure EOS. Cloudera recommends that you use the default values.
Name Default value Description
exactly.once.support requested Permitted values are requested and required. If set to required, forces a preflight check for the connector to ensure that it can provide exactly-once delivery with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to Kafka Connect that they support this. In this case, review the documentation for the connector before connector deployment and set this property to requested. Additionally, if the value is set to required but the worker that performs preflight validation does not have exactly-once support enabled for source connectors, requests to create or validate the connector will fail.
transaction.boundary poll Permitted values are poll, connector, and interval. If set to poll, a new producer transaction is started and committed for every batch of records that each task from this connector provides to Kafka Connect. If set to connector, relies on connector-defined transaction boundaries; note that not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to create them with this property set to connector will fail. If set to interval, commits transactions only after a user-defined time interval has passed.
offsets.storage.topic null The name of a separate offsets topic to use for this connector. If left empty or not specified, the worker’s global offsets topic name is used. If specified, the offsets topic is created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).
transaction.boundary.interval.ms null If transaction.boundary is set to interval, determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level offset.flush.interval.ms property.