Configuring EOS for source connectors

Learn how to enable and configure exactly-once semantics (EOS) for Kafka Connect source connectors.

EOS is a framework that enables Kafka and Kafka applications to guarantee that each message is delivered precisely once without it being duplicated or lost. EOS is supported by some Kafka Connect source connectors. To enable EOS, you must enable EOS for the Kafka Connect service roles (Kafka Connect workers) and configure your connectors to use EOS.

EOS is enabled for the Kafka Connect service roles in Cloudera Manager with the Exactly-once support for source connectors property. Connectors can be configured using the SMM UI (recommended), SMM API, or Kafka Connect API.

  • Enabling EOS for Kafka Connect service roles only enables the use of EOS on the level of the service. For individual connectors to make use of EOS, the connector must be specifically configured to use EOS.
  • Not all connectors shipped in CDP support EOS. The following list collects the source connectors shipped in CDP that support EOS.
    • JDBC Source
    • SFTP Source
    • NiFi Stateless Source
    • FileStreamSource
  • If you are using a third-party connector, review the documentation of the connector to learn whether EOS is supported.
  1. Configure Kafka Connect service roles:
    1. In Cloudera Manager, select the Kafka service.
    2. Go to Configuration.
    3. Set Exactly-once support for source connectors in the cluster to preparing.
    4. Restart all Kafka Connect service roles.
    5. Set the Exactly-once support for source connectors in the cluster to enabled.
    6. Restart all Kafka Connect service roles.
  2. Configure source connectors.

    The following table collects the common source connector properties that you can use to enable and configure EOS. Configure the following properties based on your requirements. Cloudera recommends that you use the SMM UI to configure and deploy connectors. For more information on connector configuration and deployment see Deploying and managing Kafka Connect connectors in SMM

    Table 1. EOS specific Kafka Connect source connector properties
    Name Default value Description
    exactly.once.support requested

    Permitted values are requested and required. If set to required, forces a preflight check for the connector to ensure that it can provide exactly-once delivery with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to Kafka Connect that they support this. In this case, review the documentation for the connector before connector deployment and set this property to requested. Additionally, if the value is set to required but the worker that performs preflight validation does not have exactly-once support enabled for source connectors, requests to create or validate the connector will fail.

    transaction.boundary poll Permitted values are poll, connector, and interval. If set to poll, a new producer transaction is started and committed for every batch of records that each task from this connector provides to Kafka Connect. If set to connector, relies on connector-defined transaction boundaries; note that not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to create them with this property set to connector will fail. If set to interval, commits transactions only after a user-defined time interval has passed.
    offsets.storage.topic null The name of a separate offsets topic to use for this connector. If left empty or not specified, the worker’s global offsets topic name is used. If specified, the offsets topic is created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).
    transaction.boundary.interval.ms null If transaction.boundary is set to interval, determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level offset.flush.interval.ms property.