Deploying a dataflow using Stateless NiFi

A custom developed NiFi dataflow can be deployed with the Stateless NiFi Source or Sink connectors using Streams Messaging Manager (SMM).

After building and downloading your dataflow, you can deploy it in Kafka Connect as a source or sink connector. This can be done using SMM. The following list of steps walk you through the process of deploying a Stateless NiFi Sink or Source connector and provide examples on how dataflows are configured using SMM.

These steps are for the SMM UI, however, the actions described here can also be completed using the SMM REST API. For more information, see Cloudera Streams Messaging Manager REST API Reference.

  • A CDP cluster containing SMM and Kafka is available.
  • You have access to the SMM UI.
  • The flow definition of the dataflow that you want to deploy is available to you. For more information on how to design, build, and export a dataflow using the NiFi UI, see Developing a dataflow for Stateless NiFi .
  1. Access the SMM UI.
  2. Click (Connect) on the navigation sidebar.
  3. Click + New Connector to add a new connector.
  4. Select a connector.
    Which connector you select depends on the type of dataflow you want to run. Select StatelessNiFiSourceConnector found on the Source Templates tab if your dataflow collects (sources) data from systems and publishes it to Kafka. Select the StatelessNiFiSinkConnector from the Sink Templates tab if your dataflow delivers (sinks) Kafka data into other systems.
  5. Enter a name for the connector in the Enter Name field.
    Ensure that you add a unique and easily identifiable name. The name of the connector cannot be changed once the connector is deployed.
  6. Configure the connector
    Connector configuration can be broken into three steps. First, you configure the properties available in the default configuration template. Second, you configure other connector properties. Finally, you specify the flow parameters specific to your dataflow. These are the parameters that you set up in the Parameter Context when you built the flow in NiFi. Even though the following breaks the process up into multiple substeps, all properties are configured using the Connector Form in the SMM UI.
    1. Configure the properties available in the default template.
      The following properties from the default template are the ones that you should configure. Other properties in the template that are not highlighted here have working default values configured. Configuring the properties that are present in the template but not highlighted in the following list is not recommended by Cloudera.
      flow.snapshot
      Specifies the dataflow to run. You have multiple options on how to specify the value of this property. You can copy and paste the contents of the flow definition, upload the flow definition, or reference a file path. For more information on flow.snapshot configuration, see Configuring flow.snapshot for Stateless NiFi connectors.
      input.port
      The name of the Input Port in the NiFi dataflow that Kafka records are sent to. If the dataflow contains exactly one Input Port, this property is optional and can be omitted. However, if the dataflow contains multiple Input Ports, this property must be specified. This property is specific to the Stateless NiFi Sink connector.
      output.port
      The name of the Output Port in the NiFi dataflow that is the source of records for the Kafka topic. If the dataflow contains exactly one port, this can be omitted. However, if the dataflow contains multiple ports (for example, a Success and a Failure port), this property must be specified. If any FlowFile is sent to any port other than the specified Port, it is considered as a failure. The session is rolled back and no data is collected. This property is specific to the Stateless NiFi Source connector.
      topics
      The name of the topic to deliver data to or fetch data from.
      krb5.file
      Specifies the krb5.conf file to use if the dataflow interacts with any services that are secured using Kerberos. This property is optional and is only required if a connection is established with a Kerberized service. Use the default value if an appropriate krb5.conf is located in /etc.
    2. Add other connector properties.
      There are many other properties that you can set. These include the properties specific to the Stateless NiFi connectors as well as the Kafka Connect properties that come from the base framework. Which of these you require depends on your dataflow and use case.
      Cloudera recommends that at minimum you configure the converters used for key and value conversion. What converters you use depends on dataflow; however, the following configuration is appropriate in the majority of cases:
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
      
      A comprehensive list of all other properties that the connectors accept can be found in Stateless NiFi Source connector reference, Stateless NiFi Sink Connector reference, and the Apache Kafka documentation.
    3. Configure dataflow-specific parameters.
      If you followed the recommendations provided in Dataflow development best practices for Stateless NiFi and Developing a dataflow for Stateless NiFi, your dataflow is parameterized. The parameters that you defined within the dataflow can be assigned values using the configuration pane in SMM. This is done by adding a configuration entry for each of the dataflow parameters that you want to configure. Dataflow parameters can be added to the configuration as follows:
      "parameter.[***PARAMETER NAME***]": "[***VALUE***]"
      For example, assume that you have a the following configuration entry:
      "parameter.Directory": "/mydir"
      In a case like this, any Parameter Context in the dataflow that has a parameter named Directory gets the specified value (/mydir). If the dataflow has child Process Groups, and those child Process Groups have their own Parameter Contexts, the value is used for all Parameter Contexts that contain a parameter named Directory.
      You can also apply a parameter to a specific Parameter Context. This is done by prefixing the parameter name with the name of the Parameter Context followed by a colon.
      "parameter.[***PARAMETER CONTEXT NAME***]:[***PARAMETER NAME***]": "[***VALUE***]"
      For example, assume you have the following configuration entry:
      "parameter.My Context:Directory": "/mydir"

      In a case like this, only the Parameter Context called My Context gets the specified value for the Directory parameter.

      Once all properties are configured, your configuration should look similar to the following example:
      {
       "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
       "flow.snapshot": "[***FLOW DEFINITION JSON***]",
       "key.converter": "org.apache.kafka.connect.storage.StringConverter",
       "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
       "tasks.max": "1",
       "nexus.url": "https://repository.cloudera.com/artifactory/repo",
       "extensions.directory": "/tmp/nifi-stateless-extensions",
       "working.directory": "/tmp/nifi-stateless-working",
       "topics": "[***KAFKA TOPIC NAME***]",
       "parameter.MQTT Source Parameters:MQTT Broker URI": "tcp://[***HOST***]:[***PORT***]",
       "parameter.MQTT Source Parameters:MQTT Quality of Service": "0",
       "parameter.MQTT Source Parameters:MQTT Topics": "[***MQTT TOPIC NAME***]"
      }
      

      This example is for a custom built MQTT source connector. The example uses additional Connect properties such as key.converter and value.converter. The example also specifies a number of flow parameters such as MQTT Broker URI and MQTT Topics.

  7. Click Validate.
    The validator displays any JSON errors in your configuration. Fix any errors that are displayed. If your JSON is valid, the JSON is valid message is displayed in the validator.
  8. Click Next.
  9. Review your connector configuration.
  10. Click Deploy.