Dataflow development best practices for Stateless NiFi

By leveraging Stateless NiFi, you can build your own dataflows and use those as Kafka Connect connectors. However, there are some principles to keep in mind when building such a dataflow in NiFi.

General recommendations and criteria

  • If your dataflow depends on external resources (like a JDBC driver), the resource must be made available on all Kafka Connect workers. Additionally, the resource must be in the same location on all nodes with proper file permissions. Cloudera also recommends parameterizing the file path so that you can specify it when deploying the connector.
  • The dataflow should be designed in its dedicated Process Group and should have a Parameter Context assigned to its Process Group. This way you have the ability to parameterize your connector which enables you to provide custom property values when deploying the connector using SMM.
  • A source connector flow should always have a single source. This means that the flow should have only one Processor with no incoming connections.
  • A sink connector flow should always have a single destination. If you want to transfer the data consumed from the Kafka topic to two different destinations at the same time, it is preferred to have two distinct sink connectors with different consumer groups. If using a single sink connector, you must take into consideration the possibility of creating duplicates. If one destination is working as expected but not the other, the Kafka message is not acknowledged and consumed again at a later time while the data was still successfully sent to one destination.

Source connectors

A Stateless NiFi Source connector is responsible for obtaining data from one source and delivering that data to Kafka. Any source connector flow you build should not attempt to deliver directly to Kafka itself using a Processor such as PublishKafka. Instead, the data should be routed to an Output Port. Any FlowFile delivered to that Output Port is obtained by the connector and delivered to Kafka.

Each FlowFile is delivered as a single Kafka record. Therefore, if a FlowFile contains thousands of JSON records, totaling 5 MB for example, it is important to split those FlowFiles into individual records using a SplitRecord processor before transferring the data to the Output Port. Otherwise, this results in a single Kafka record that is 5 MB while the default record size for Kafka is 1 MB.

Sink connectors

A Stateless NiFi Sink connector is responsible for obtaining data from Kafka and delivering that data to some other service. Any sink connector flow you build should not attempt to source data directly from Kafka using a Processor such as ConsumeKafka. Instead, the data should be received from an Input Port. Each Kafka record is enqueued into the outbound connection of the Input Port so that the next processor in the flow is able to process it.

Each Kafka record FlowFile is delivered as a single FlowFile. Depending on the destination, Cloudera recommends that you consider merging your Kafka records together into bigger chunks before delivering them. For example, assume you are delivering data to HDFS. Because HDFS is not built for accessing and handling many small files, it is not recommended to send each individual Kafka message to HDFS as a separate file.

Considerations for listen-type source connectors

A listen-type source connector consists of a NiFi dataflow in which the first Processor is a ListenX processor. These Processors listen on specific interfaces and specific ports on the host where they are running. For example, a flow starting with a ListenHTTP Processor listens on a specific port for HTTP requests made by external clients. There are a number of limitations that apply to listen-type connectors that you should be aware of.

The Kafka Connect framework does not provide the concept of task affinity mapping, meaning that if a connector is deployed with multiple tasks, it is not possible to ensure that tasks will be on different Kafka Connect workers. Two tasks started on the same worker in a listen-type connector results in a failure. This happens because both connectors try to bind to the same port. For this reason, it is required that all listen-type source connectors are deployed with a single task.

A common deployment model is to have a load balancer in front of the Kafka Connect workers and a mapping rule linking to the port on each Kafka Connect worker. This way, the load balancer FQDN can be provided to the client without assuming which worker the task is going to be deployed on.

If multiple tasks are required for performance, you can opt to have multiple standalone Kafka Connect workers and deploy the connector independently on each worker. However, in such a case, Cloudera recommends using Cloudera Flow Management and NiFi as they provide a significantly more powerful environment for this type of use cases.

Merging considerations for sink connectors

NiFi supports many Processors that can be used as sinks for Apache Kafka data. The data from Kafka can often be delivered directly to a sink processor if the destination service operates well with many small messages. For example, a PublishJMS Processor can easily handle many small messages. However, other services, like S3 or HDFS, perform much better if the data is first batched or merged together. For this reason, the MergeContent and MergeRecord Processors are extremely popular in NiFi. They allow many small FlowFiles to be merged together into one larger FlowFile.

With traditional NiFi, you can set a minimum and maximum size for the merged data along with a timeout. However, with Stateless NiFi and Kafka Connect, this might not work. This is because only a limited number of FlowFiles are made available to the Processor. You can still use these Processors to merge the data together, but some limitations apply.

If MergeContent or MergeRecord is triggered but not enough FlowFiles are present to create a batch, the Processor does nothing. Therefore, Cloudera recommends that you set Minimum Number of Entries to 1 and Minimum Group Size to 0 B (these are default values). With these settings, the MergeContent and MergeRecord Processors create a merged FlowFile from the input FlowFiles available in the given execution of the connector and do not wait for more data to arrive. Max Bin Age does not need to be configured in this case.

The Maximum Number of Entries and Maximum Group Size processor properties can also be used to limit the maximum size of the merged FlowFile. In a case like this, the Processor might create multiple merged FlowFiles per connector execution.The last FlowFile will contain the leftover items. Because of this, the last file might be smaller than the maximum size.

Additionally, you can configure Kafka Connect properties such as offset.flush.timeout.ms to control the amount of data that is provided by the Kafka Connect framework for a single execution of the connector.

Dataflow execution and scheduling

Stateless NiFi does not consider the scheduling settings of processors. When you deploy a connector, Kafka Connect starts a Stateless NiFi engine. The Stateless NiFi engine then runs the dataflow.

For source connectors, the dataflow is triggered continuously. This means that every time the dataflow finishes running, it is triggered to run again.

For sink connectors, messages are continuously enqueued to the input port of the dataflow. The dataflow is then triggered based on an interval to run as many times as necessary to consume all the messages enqueued to the input port. The trigger interval is determined by the Kafka Connect role’s Offset Flush Interval (offset.flush.interval.ms) property. Once all messages are consumed from the input port, the dataflow stops running and Kafka Connect resumes enqueuing messages on the dataflow’s input port. The next set of enqueued messages are processed after the next trigger interval is reached.