Dataflow development best practices for Stateless NiFi

By leveraging Stateless NiFi, you can build your own dataflows and use those as Kafka Connect connectors. However, there are some principles to keep in mind when building such a dataflow in NiFi.

General recommendations and criteria

Learn about the general recommendations and criteria that you need to follow when building a dataflow that you will run as a Kafka Connect connector.

  • If your dataflow depends on external resources (like a JDBC driver), the resource must be made available on all Kafka Connect role instance nodes (Connect workers). Additionally, the resource must be in the same location on all nodes with proper file permissions. Cloudera also recommends parameterizing the file path so that you can specify it when deploying the connector.
  • The dataflow should be designed in its dedicated process group and should have a parameter context assigned to its process group. This way, you have the ability to parameterize your connector, which enables you to provide custom property values when deploying the connector using Streams Messaging Manager (SMM).
  • A source connector flow should always have a single source. This means that the flow should have only one processor with no incoming connections.
  • A sink connector flow should always have a single destination. If you want to transfer the data consumed from the Kafka topic to two different destinations at the same time, it is preferred to have two distinct sink connectors with different consumer groups. If using a single sink connector, you must take into consideration the possibility of creating duplicates. If one destination is working as expected but not the other, the Kafka message is not acknowledged and consumed again at a later time while the data was still successfully sent to one destination.

Dataflow execution and scheduling

Learn how Stateless NiFi Source and Sink connectors handle dataflow execution and scheduling.

Stateless NiFi does not consider the scheduling settings of processors. When a Stateless NiFi connector is deployed, Kafka Connect starts an instance of the Stateless NiFi engine that executes the dataflow. The following sections discuss in detail how the dataflow is executed by both source and sink connectors

Source connectors

A Kafka Connect source connector obtains data from an external (source) data store and delivers the data to a Kafka service. The architecture of a source connector is as follows.

Figure 1. Source Connector Architecture
When a Stateless NiFi Source connector instance is deployed, a number of tasks are started for the connector instance. The number of tasks is determined by the tasks.max property, which can be set during connector deployment. Each task consists of the following:
  • SourceTask

    In Stateless NiFi connectors, the SourceTask is responsible for executing one instance of the connector's dataflow.

  • SMT Chain

    The Single Message Transforms (SMT) chain can modify and filter the data on a single message basis. However, SMT is currently not used with Stateless NiFi Source connectors.

  • Converter

    The converter is responsible for converting Kafka message keys and values with a configured conversion mechanism. However, Stateless NiFi connectors currently only use keys of string type and values of byte array type. As a result, the converter does not perform any meaningful conversion.

  • Kafka Producer

    The Kafka producer is responsible for delivering messages to the configured Kafka topics.

With the Stateless NiFi Source connector, the SourceTask is the component executing an instance of the dataflow specified in the connector's flow.snapshot property. Any source connector flow you build should not attempt to deliver directly to Kafka itself using a processor such as PublishKafka. Instead, the data should be routed to a NiFi output port in the dataflow. Any flowfile delivered to that output port is obtained by the connector and delivered to Kafka.

The dataflow is triggered continuously in one thread within the SourceTask. This means that in one iteration, the entire flow is executed (one processor after another), and the data it fetches from the source data store in that execution ends up on the flow’s output port in the form of flowfiles. These flowfiles are forwarded by the Kafka Connect framework to the task’s producer as messages (one flowfile equals one message). At this point the flow’s execution is finished, and the flow is triggered again.

The producer is a regular Kafka producer that is responsible for delivering the messages to the Kafka topics. The producer is running on a separate thread. This means that while the dataflow is running, the producer can work on delivering the messages it received at the end of the flow’s previous execution.

Each flowfile is delivered as a single Kafka message. Therefore, if a flowfile contains thousands of JSON records, totaling 5 MB for example, it is important to split those flowfiles into individual records using a SplitRecord processor before transferring the data to the output port. Otherwise, this results in a single Kafka message that is 5 MB while the default message size for Kafka is 1 MB.

Scheduling settings of the processors in the dataflow are ignored. The dataflow is executed in one thread from the first processor through the output port. Afterward, the flow is retriggered. One execution of the dataflow takes as long as necessary to fetch and process the data that accumulated in the source data store since the previous execution. However, the execution has a default timeout of one minute. This timeout can be configured on a connector level with the dataflow.timeout property.

Transactional sources

Unlike with traditional NiFi, Stateless NiFi keeps the contents of flowfiles in memory. As long as the source of the data is replayable and transactional, there is no concern over data loss. This is handled by treating the entire dataflow as a single transaction. Once data is obtained from a source component, the data is then transferred to the next processor in the flow. At this point, in traditional NiFi, the processor would acknowledge the data and NiFi would take ownership of that data, having persisted it to disk.

With Stateless NiFi, however, the data is not yet acknowledged. Instead, data is transferred from one processor to the next with each processor performing and completing its task. This is repeated until all data is queued up at the output port. At this point, the NiFi connector provides the data to Kafka. Only after Kafka has acknowledged the records does the connector acknowledge this to NiFi. Only at that point is the session committed, allowing the source processor to acknowledge receipt of the data.

As a result, if NiFi is restarted while processing some piece of data, the source processor will not have acknowledged the data and is able to replay the data, resulting in no data loss.

Initial data load

Stateless NiFi connectors are intended to run continuously. This means that the dataflow is retriggered immediately after a single execution finishes. This also means that the dataflow is only supposed to process as many messages at a time that accumulated in the source data store since the previous execution.

Depending on the connector, if there is already a significant amount of data in the source data store and the connector is not configured properly, you might run into problems with the dataflow's initial execution. In such a case, the dataflow tries to fetch all the data from the source data store during the initial execution. If there is too much data, one of the following happens:
  • No data appears in the target Kafka topics, and the log file includes an InterruptedException entry.

    This means that loading and processing the data took more time than the timeout configured in dataflow.timeout.

  • Not data appears in the target Kafka topics, and the connector throws an OutOfMemory error.

    This can happen because the Stateless NiFi engine, which runs the dataflow, stores all data in memory. Trying to read too much data can exhaust the available memory capacity.

If you need to handle a large initial data load, Cloudera recommends that you process and transfer the initial load using another tool, for example regular NiFi. Afterward, you can configure and start the Kafka connector to handle subsequent changes in the source system.

Sink connectors

A Kafka Connect sink connector obtains data from a Kafka service and delivers that data to an external (sink) data store. The architecture of a sink connector is as follows.

Figure 2. Sink Connector Architecture
When a Stateless NiFi Sink connector instance is deployed, a number of tasks are started for the connector instance. The number of tasks is determined by the tasks.max property, which can be set during connector deployment. Each task consists of the following:
  • Kafka Consumer

    The consumer is responsible for fetching the messages from the Kafka topics and delivering them to the converter.

  • SMT Chain

    The Single Message Transforms (SMT) chain can modify and filter the data on a single message basis. However, SMT is currently not used with Stateless NiFi Sink connectors.

  • Converter

    The converter is responsible for converting Kafka message keys and values with a configured conversion mechanism. However, Stateless NiFi connectors currently only use keys of string type and values of byte array type. As a result, the converter does not perform any meaningful conversion.

  • SinkTask

    In Stateless NiFi connectors, the SinkTask is responsible for executing one instance of the connector's dataflow.

The consumer is a regular Kafka consumer reading messages from the configured topics and delivering them to the SinkTask. Messages are delivered through the converter and the SMT chain, but these two components can be ignored in Stateless NiFi.

In a Stateless NiFi Sink connector, the SinkTask is the one executing an instance of the dataflow configured in the connector’s flow.snapshot property. Any sink connector flow you build should not attempt to source data directly from Kafka using a processor such as ConsumeKafka. Instead, the data should be received from an input port within the dataflow. Each Kafka record is queued into the outbound connection of the input port so that the next processor in the flow is able to process it. The entire task (consumer + SinkTask) is executed in one thread. There are two important timeout parameters relevant for sink connectors.
  • Offset Flush Interval (offset.flush.interval.ms)

    This is the interval, in milliseconds, at which Kafka Connect attempts to commit task offsets.

  • dataflow.timeout

    Specifies the maximum amount of time to wait for the dataflow to complete.

The execution of a SinkTask is the following:
  1. The consumer consumes messages from the Kafka topics for the period of time determined by Offset Flush Interval (default is one minute), and puts the messages on the input of the SinkTask, that is, the input port of the connector’s dataflow.
  2. After Offset Flush Interval is reached, the consumer stops consuming messages and the flow is triggered. Once triggered, the flow processes all messages that accumulated on its input port. Processing time is not infinite. The flow will time out once the value set in dataflow.timeout is reached.
  3. After flow is finished running, the process restarts with step 1.

Notice that the flow is not triggered for each Kafka message individually. Instead, it is triggered for a batch of messages that the consumer put on its input port in the time specified by Offset Flush Interval. One execution (trigger) of the flow is one transaction. This means that the flow either manages to process all messages within the batch successfully (delivers them to the target system and commits the transaction), or rolls back the session. If the session is rolled back, the consumer is going to put the same batch of messages on the input of the flow in the next execution of step 1. Therefore, in the next execution of step 2, the flow is going to try and process the same batch of messages.

Each Kafka message is delivered to the target system as a single flowfile. Depending on the destination, Cloudera recommends that you consider merging your Kafka records together into bigger chunks before delivering them. For example, assume you are delivering data to HDFS. Because HDFS is not built for accessing and handling many small files, it is not recommended to send each individual Kafka message to HDFS as a separate file.

Merging considerations for sink connectors

Learn about the limitations and considerations when using MergeContent and MergeRecord processors with Stateless NiFi.

NiFi supports many processors that can be used as sinks for Kafka data. If the destination service operates well with many small messages, then the data from Kafka can often be delivered directly to a sink processor. For example, a PublishJMS processor can easily handle many small messages. However, other services, like S3 or HDFS, perform much better if the data is first batched or merged together. For this reason, the MergeContent and MergeRecord processors are extremely popular in NiFi. They make it possible to merge many small flowfiles into one larger flowfile.

With MergeContent and MergeRecord in traditional NiFi, you can set a minimum and maximum size for the merged data along with a timeout. However, with Stateless NiFi and Kafka Connect, this might not work. This is because only a limited number of flowfiles are made available to the processor. You can still use these processors to merge the data together, but some limitations apply.

If MergeContent or MergeRecord is triggered but not enough flowfiles are present to create a batch, the processor does nothing. Therefore, Cloudera recommends that you set Minimum Number of Entries to 1 and Minimum Group Size to 0 B (these are default values). With these settings, the MergeContent and MergeRecord processors create a merged flowfile from the input flowfiles available in the given execution of the connector and do not wait for more data to arrive. Configuring Max Bin Age is not required in this case.

The Maximum Number of Entries and Maximum Group Size processor properties can also be used to limit the maximum size of the merged flowfile. For example, in a NiFi Stateless Sink connector the consumer running inside the connector puts a group of messages on the input flow. Afterward, the flow is executed. This execution of the flow can only work with the amount of messages that it has on its input when it gets triggered. If it had 115 messages on its input, and MergeContent processor’s Maximum Number of Entries is 100, the flow is going to produce two outgoing messages, one with 100 entries and another one with 15.

To control how many messages are placed on the input of the flow for one execution, you can configure the Offset Flush Interval (offset.flush.interval.ms) Kafka Connect role property. The lower the interval, the less time the consumer has to put messages on the input of the flow.

NiFi Stateless Sink merging example

Let’s look at a merging example with a NiFi Stateless Sink connector. Assume that the dataflow contains the following elements in the specified order:
input port -> MergeContent -> PutHDFS
Additionally, assume the following configurations:
  • Offset Flush Interval is set to one minute (default). In this timeframe the internal consumer puts 1100 messages on the input port of the dataflow.
  • dataflow.timeout is 90 seconds.
  • The MergeContent processor has the following properties:
    • Maximum Number of Entries = 1
    • Minimum Group Size = 0B
    • Maximum Number of Entries = 100

In this case, MergeContent creates 11 flowfiles, and PutHDFS has to upload these 11 files to HDFS within what is left of dataflow.timeout (90 seconds). Assume that PutHDFS manages to upload six files when dataflow.timeout is up. Since the dataflow did not finish execution within the dedicated timeframe, it gets interrupted, and the transaction is rolled back. The consumer starts putting messages on the input of the dataflow for a minute again.

However, since the previous transaction was rolled back, the messages that the dataflow tried to process were not committed. Therefore, the consumer puts the same 1100 messages on the input port of the dataflow. This behavior can result in an endless loop. Additionally, HDFS was not notified that the six files that were uploaded successfully should be removed. This results in these files being reuploaded in the next execution. You can resolve an issue like this by doing the following:

  1. Decrease Offset Flush Interval.

    For example, decreasing this interval to 30 seconds (from one minute) halves the number of messages that are put on the input port of the dataflow. Fewer messages on the input port means that the dataflow needs to process fewer messages in one execution. 550 instead of 1100.

  2. Set Maximum Number of Entries to a value that is larger than the number of messages the you expect the dataflow will receive.

    Increasing this value to, for example, 1,000,000,000 ensures that all messages are put in one HDFS file. This way all data is either successfully uploaded to HDFS, or the upload is interrupted and rolled back, but without leftover bits in the target system.

Considerations for listen-type source connectors

There are a number of limitations that apply to listen-type connectors that you should be aware of.

A listen-type source connector consists of a NiFi dataflow in which the first processor is a ListenX processor. These processors listen on specific interfaces and specific ports on the host where they are running. For example, a flow starting with a ListenHTTP processor listens on a specific port for HTTP requests made by external clients.

The Kafka Connect framework does not provide the concept of task affinity mapping, meaning that if a connector is deployed with multiple tasks, it is not possible to ensure that tasks are assigned to different Kafka Connect workers (Kafka Connect role instances). Two tasks started on the same worker in a listen-type connector results in a failure. This happens because both connectors try to bind to the same port. For this reason, you must ensure that listen-type source connectors are deployed with a single task. That is, you must set tasks.max=1 when configuring the connector.

A common deployment model is to have a load balancer in front of the Kafka Connect workers and a mapping rule linking to the port on each Kafka Connect worker. This way, the load balancer FQDN can be provided to the client without assuming which worker the task is going to be deployed on.

If multiple tasks are required for performance, you can choose to have multiple standalone Kafka Connect workers and deploy the connector independently on each worker. However, in such a case, Cloudera recommends using Cloudera Flow Management and NiFi as they provide a significantly more powerful environment for this type of use cases.

Mapping of NiFi features

There are some features that exist in NiFi that translate seamlessly in the context of Kafka Connect. The following discusses some of these NiFi features and how they can be utilized in Kafka Connect.

State management

In NiFi, a processor is capable of storing state about the work that it has accomplished. This is particularly important for source components such as the ListS3 processor. This processor keeps state about the data that it has already seen so that it does not list the same files repeatedly. State in NiFi can be stored local to the node (local state), or across all nodes in a cluster (cluster-wide state).

When using the Stateless NiFi Source connector, the state that is stored by these processors is provided to Kafka Connect and is stored within Kafka itself as source offsets and source partition information. This allows a tasks to restart and resume where they left off. If a processor stores local state in NiFi, it will be stored in Kafka using a source partition that corresponds directly to that task. As a result, each task is analogous to a node in a NiFi cluster. If the processor stores cluster-wide state in NiFi, the state will be stored in Kafka using a source partition that corresponds to a cluster-wide state.

Primary node

NiFi provides several processors that are expected to run only on a single node in the cluster. This type of execution is configured by setting Execution to Primary Node Only in the SCHEDULING tab on the NiFi UI when configuring a processor.

When using the Stateless NiFi Source connector, if any source processor in the dataflow is set to run on a primary node only, only a single task will ever run, even if the tasks.max connector property is set to a value higher than one. If you attempt to use multiple tasks for a dataflow that has Primary Node Only set, a warning is logged.

Because processors should only be scheduled with Primary Node Only if they are sources of data, this setting is ignored for all sink tasks and for any processor in a source task that has incoming connections.

Processor yielding

When a processor determines that it is not capable of performing any work (for example, because the system that the processor is pulling from has no more data available), it might choose to yield. This means that the processor stops running for some amount of time. You can configure how long the processors stops running in a dataflow by configuring Processor > Configure > SETTINGS > Yield Duration in NiFi. When using a source connector, if a processor chooses to yield, the source connector pauses for the configured amount of time before triggering the dataflow to run again.