Kafka Connect tasks and Stateless NiFi
Learn which Stateless NiFi connectors you can or cannot run with multiple Kafka Connect tasks (task.max>=2).
By default all connectors that you deploy start with a single Kafka Connect task. The number of
tasks deployed for a connector can be configured using the tasks.max
connector
property, but not all types of connectors are meant to run using more than a single task.
Specifically with Stateless NiFi, there are a handful of source connectors that either cannot run with multiple tasks or function incorrectly when multiple tasks are configured. The following provides information on which connectors can or cannot be run with multiple tasks and what effects configuring multiple tasks has on both sink and source connectors.
Source connectors
Notice that the connector's task is made up of multiple components, these are as follows:
- SourceTask
In Stateless NiFi connectors, the SourceTask is responsible for executing one instance of the connector's dataflow.
- SMT Chain
The Single Message Transforms (SMT) chain can modify and filter the data on a single message basis.
- Converter
The converter is responsible for converting Kafka message keys and values with a configured conversion mechanism. However, Stateless NiFi connectors currently only use keys of string type and values of byte array type. As a result, the converter does not perform any meaningful conversion.
- Kafka Producer
The Kafka producer is responsible for delivering messages to the configured Kafka topics.
If you deploy a source connector with more than one task, then each task is going to have its own instance of the task components. Because the dataflow in the SourceTask is responsible for fetching the data, rather than the source data store providing the data to the SourceTask, it is not always possible to start a connector with more than one task.
For example, starting a JDBC Source connector, which is a Stateless NiFi-based connector, with two tasks would mean that two instances of the dataflow are running in parallel. Both are fetching data from the same table of the same database. These two tasks would be delivering the messages to the same Kafka topic. However, the two dataflow instances running in parallel would have no way of knowing whether a certain record was already been fetched by the other instance. This results in the same piece of data appearing in multiple Kafka messages.
Because of this, there are a number of limitations that you must keep in mind when using the Stateless NiFi Source connectors. These limitations apply when you are running a custom dataflow with a Stateless NiFi Source connector, as well when running any of the Stateless NiFi-based source connectors that Cloudera develops.
- Listen-type connectors
- A listen-type connector consists of a NiFi dataflow in which the first processor is a
ListenX processor. Listen-type processors start a server with one or more background threads,
listening on a defined network port. Stateless NiFi-based connectors of this category are:
- HTTP Source
- Syslog TCP Source
- Syslog UDP Source
For example, the HTTP Source connector’s flow uses a ListenHTTP NiFi processor to start an HTTP server listening on a configured port. Deploying this connector with two tasks would mean that two ListenHTTP processors are running simultaneously, both listening on the same network port. Since there is no user-defined mapping between tasks and Kafka Connect workers, it is possible that these two instances get deployed to the same worker. This means that two HTTP servers would be listening on the same port on the same cluster node. This results in port collision. To avoid issues like this, you must ensure that listen-type connectors are always deployed with exactly one task.
- Primary node only connectors
- Connectors in this category are connectors that use a dataflow with a NiFi processor that
is configured to run on the primary node only if the flow was running on a regular NiFi
cluster. For example, the QueryDatabaseTableRecord processor is a processor that is intended
to run on primary nodes only. This processor is used in the JDBC Source connector. Stateless
NiFi-based connectors of this category are:
-
JDBC Source
-
SFTP Source
For these connectors, the configured value of the
tasks.max
property is ignored. The connector is always started with exactly one task. -
- All other connectors
- Connectors in this category are connectors that are neither listen-type or primary node
only. Stateless NiFi-based connectors of this category are:
-
JMS Source
-
MQTT Source
There is no limitation to the number of tasks for these connectors. In the case of these connectors, it is the responsibility of the source data store system to make sure that one piece of data is only given to one of the connector’s tasks
-
Sink Connectors
- Kafka Consumer
The consumer is responsible for fetching the messages from the Kafka topics and delivering them to the converter.
- SMT Chain
The Single Message Transforms (SMT) chain can modify and filter the data on a single message basis.
- Converter
The converter is responsible for converting Kafka message keys and values with a configured conversion mechanism. However, Stateless NiFi connectors currently only use keys of string type and values of byte array type. As a result, the converter does not perform any meaningful conversion.
- SinkTask
In Stateless NiFi connectors, the SinkTask is responsible for executing one instance of the connector's dataflow.
If you deploy a sink connector with more than one task, each task is going to have its own instance of the four task components.
With sink connectors, there is no limitation to using more than one task for a connector. When a connector is started with multiple tasks, each task has its own Kafka consumer, but these consumers belong to the same Kafka consumer group. Kafka makes sure that a message fetched by one consumer is not given to another consumer within the same consumer group.
There is no user-defined mapping between tasks and Kafka Connect workers, meaning that once deployed, the tasks of a connector may or may not be executed on the same worker.