Developing a dataflow for Stateless NiFi

Learn about the recommended process of building a dataflow that you can deploy with the Stateless NiFi Sink or Source connectors. This process involves building and designing a parameterized dataflow within a process group and then downloading the dataflow as a flow definition.

The general steps for building a dataflow are identical for both source and sink connector flows. For ease of understanding a dataflow example for a simple MQTT Source and MQTT Sink connector is provided without going into details (what processors to use, what parameters and properties to set, what relationships to define and so on).

  • Ensure that you reviewed Dataflow development best practices for Stateless NiFi.
  • You have access to a running instance of NiFi. The NiFi instance does not need to run on a cluster. A standalone instance running on any machine, like your own computer, is sufficient.
  • Ensure that the version of your NiFi instance matches the versions of the Stateless NiFi plugin used by Kafka Connect on your cluster.

    You can look up the Stateless NiFi plugin version either by using the Streams Messaging Manager (SMM) UI, or by logging into a Kafka Connect host and checking the Connect plugin directory. In addition, ensure that you note down the plugin version. You will need to manually edit the flow definition JSON and replace the NiFi version with the plugin version.

    1. Access the SMM UI, and click Connect in the navigation sidebar.
    2. Click the New Connector option.
    3. Locate the StatelessNiFiSourceConnector or StatelessNiFiSinkConnector cards. The version is located on the card.

      The version is made up of multiple digits. The first three represent the NiFi version. For example, if the version on the card is 1.18.0.2.4.3.0-63, then you should use NiFi 1.18.0 to build your flow.

    1. Using SSH, log in to one of your Kafka Connect hosts.
    2. Navigate to the directory where your Kafka Connect plugins are located.

      The default directory is /var/lib/kafka. If you are using a custom plugin directory, go to Cloudera Manager > Kafka service > Configuration and search for the Plugin Path property. This property specifies the directory where Kafka Connect plugins are stored.

    3. List the contents of the directory and look for a nifi-kafka-connector-[***VERSION***] entry.

      The version is made up of multiple digits. The first three represent the NiFi version. For example, if the version in the name of the entry is 1.18.0.2.4.3.0-63, then you should use NiFi 1.18.0 to build your flow.

  1. Access the NiFi UI.
  2. Create a new process group.
  3. Right-click the process group and select Configure.
  4. Create a new parameter context and assign it to the process group:
    1. Select Process Group Parameter Context > Create new parameter context...
    2. Enter a name for the parameter context.
    3. Click Apply.
  5. Click Apply and then click OK.
  6. Close the configuration dialog.
  7. Double-click the process group you created.
  8. Design and parameterize your dataflow.
    For example, you can build a dataflow that you deploy as a source connector. The following is an example dataflow for a simple MQTT source connector. It consists of a ConsumeMQTT processor and an output port (representing a Kafka topic).

    Alternatively, you can also build a dataflow to deploy as a sink connector. The following is an example dataflow for a simple MQTT sink connector. It consists of an input port (representing a Kafka topic), a PublishMQTT processor, and two output ports.

    In the case of this example, a failure output port is required. This is done so that in case the destination is not available, the session can be rolled back and the Kafka message can be declined. The failure port is specified in the configuration of the connector when deploying it through SMM.

    When designing and building your dataflow, ensure that you update your parameter context and reference the parameters in your components. For example:

    Keep in mind that sensitive properties in processors or controller services must always be parametrized. That is, they must get their values from a parameter in the process group’s parameter context. This is because the values of sensitive properties are not exported with the process group, therefore, they must be configurable when deploying the connector.

    Additionally, sensitive processor or controller service properties can only reference sensitive parameter context parameters. That is, the parameter must be marked as sensitive during creation.

    For more information on building dataflows, parameters, and referencing parameters, see the Flow Management library.

  9. Exit the process group.
  10. Right-click the process group and select Download flow definition > Without external service.
    If you followed these steps, your process group is not referencing controller services that are outside of the process group. As a result, you can choose to download the flow without external services included.
  11. Open the JSON file you downloaded in a text editor, and replace all occurrences of the NiFi version with the Stateless NiFi plugin version used in your cluster.
    For example, if the NiFi version is 1.18.0 and your plugin version is 1.18.0.2.4.3.0-63, then every occurrence of 1.18.0 in the JSON must be replaced with 1.18.0.2.4.3.0-63.
The dataflow is downloaded as a flow definition (JSON file).
Deploy the dataflow as a Kafka Connect connector using the Stateless NiFi Source or Sink connectors. Continue with Deploying a dataflow using stateless NiFi.