Kafka Connect tasks and Stateless NiFi

Learn which Stateless NiFi connectors you can or cannot run with multiple Kafka Connect tasks (task.max>=2).

By default all connectors that you deploy start with a single Kafka Connect task. The number of tasks deployed for a connector can be configured using the tasks.max connector property, but not all types of connectors are meant to run using more than a single task.

Specifically with Stateless NiFi, there are a handful of source connectors that either cannot run with multiple tasks or function incorrectly when multiple tasks are configured. The following provides information on which connectors can or cannot be run with multiple tasks and what effects configuring multiple tasks has on both sink and source connectors.

Source connectors

Notice that the connector's task is made up of multiple components, these are as follows:

  • SourceTask

    In Stateless NiFi connectors, the SourceTask is responsible for executing one instance of the connector's dataflow.

  • SMT Chain

    The Single Message Transforms (SMT) chain can modify and filter the data on a single message basis. However, SMT is currently not used with Stateless NiFi Source connectors.

  • Converter

    The converter is responsible for converting Kafka message keys and values with a configured conversion mechanism. However, Stateless NiFi connectors currently only use keys of string type and values of byte array type. As a result, the converter does not perform any meaningful conversion.

  • Kafka Producer

    The Kafka producer is responsible for delivering messages to the configured Kafka topics.

If you deploy a source connector with more than one task, then each task is going to have its own instance of the task components. Because the dataflow in the SourceTask is responsible for fetching the data, rather than the source data store providing the data to the SourceTask, it is not always possible to start a connector with more than one task.

For example, starting a JDBC Source connector, which is a Stateless NiFi-based connector, with two tasks would mean that two instances of the dataflow are running in parallel. Both are fetching data from the same table of the same database. These two tasks would be delivering the messages to the same Kafka topic. However, the two dataflow instances running in parallel would have no way of knowing whether a certain record was already been fetched by the other instance. This results in the same piece of data appearing in multiple Kafka messages.

Because of this, there are a number of limitations that you must keep in mind when using the Stateless NiFi Source connectors. These limitations apply when you are running a custom dataflow with a Stateless NiFi Source connector, as well when running any of the Stateless NiFi-based source connectors that Cloudera develops.

Listen-type connectors
A listen-type connector consists of a NiFi dataflow in which the first processor is a ListenX processor. Listen-type processors start a server with one or more background threads, listening on a defined network port. Stateless NiFi-based connectors of this category are:
  • HTTP Source
  • Syslog TCP Source
  • Syslog UDP Source

For example, the HTTP Source connector’s flow uses a ListenHTTP NiFi processor to start an HTTP server listening on a configured port. Deploying this connector with two tasks would mean that two ListenHTTP processors are running simultaneously, both listening on the same network port. Since there is no user-defined mapping between tasks and Kafka Connect workers, it is possible that these two instances get deployed to the same worker. This means that two HTTP servers would be listening on the same port on the same cluster node. This results in port collision. To avoid issues like this, you must ensure that listen-type connectors are always deployed with exactly one task.

Primary node only connectors
Connectors in this category are connectors that use a dataflow with a NiFi processor that is configured to run on the primary node only if the flow was running on a regular NiFi cluster. For example, the QueryDatabaseTableRecord processor is a processor that is intended to run on primary nodes only. This processor is used in the JDBC Source connector. Stateless NiFi-based connectors of this category are:
  • JDBC Source

  • SFTP Source

For these connectors, the configured value of the tasks.max property is ignored. The connector is always started with exactly one task.

All other connectors
Connectors in this category are connectors that are neither listen-type or primary node only. Stateless NiFi-based connectors of this category are:
  • JMS Source

  • MQTT Source

There is no limitation to the number of tasks for these connectors. In the case of these connectors, it is the responsibility of the source data store system to make sure that one piece of data is only given to one of the connector’s tasks

Sink Connectors

Figure 1. Sink Connector Architecture
Notice that the connector's task is made up of multiple components, these are as follows:
  • Kafka Consumer

    The consumer is responsible for fetching the messages from the Kafka topics and delivering them to the converter.

  • SMT Chain

    The Single Message Transforms (SMT) chain can modify and filter the data on a single message basis. However, SMT is currently not used with Stateless NiFi Sink connectors.

  • Converter

    The converter is responsible for converting Kafka message keys and values with a configured conversion mechanism. However, Stateless NiFi connectors currently only use keys of string type and values of byte array type. As a result, the converter does not perform any meaningful conversion.

  • SinkTask

    In Stateless NiFi connectors, the SinkTask is responsible for executing one instance of the connector's dataflow.

If you deploy a sink connector with more than one task, each task is going to have its own instance of the four task components.

With sink connectors, there is no limitation to using more than one task for a connector. When a connector is started with multiple tasks, each task has its own Kafka consumer, but these consumers belong to the same Kafka consumer group. Kafka makes sure that a message fetched by one consumer is not given to another consumer within the same consumer group.

There is no user-defined mapping between tasks and Kafka Connect workers, meaning that once deployed, the tasks of a connector may or may not be executed on the same worker.