Deploying a dataflow using Stateless NiFi

A custom developed NiFi dataflow can be deployed with the Stateless NiFi Source or Sink connectors using Streams Messaging Manager (SMM).

After building and downloading your dataflow, you can deploy it in Kafka Connect as a source or sink connector. This can be done using SMM. The following list of steps walk you through the process of deploying a Stateless NiFi Sink or Source connector and provide examples on how dataflows are configured using SMM.

These steps are for the SMM UI, however, the actions described here can also be completed using the SMM REST API. For more information, see Cloudera Streams Messaging Manager REST API Reference.

  • A CDP cluster containing SMM and Kafka is available.
  • You have access to the SMM UI.
  • The flow definition of the dataflow that you want to deploy is available to you. For more information on how to design, build, and export a dataflow using the NiFi UI, see Developing a dataflow for Stateless NiFi .
  • The properties of Kafka consumers and producers used internally by the connector (Connect-managed clients) can be configured on a per-connector basis. The following steps give an example of how this can be done. However, in order for consumer and producer property overrides to take effect, you must ensure that the Connector Kafka Client Configuration Override Policy Kafka service property is set to All in Cloudera Manager. By default this property is set to None, which means that no overrides are allowed.
  1. Access the SMM UI.
  2. Click (Connect) on the navigation sidebar.
  3. Click + New Connector to add a new connector.
  4. Select a connector.
    Which connector you select depends on the type of dataflow you want to run. Select StatelessNiFiSourceConnector found on the Source Templates tab if your dataflow collects (sources) data from systems and publishes it to Kafka. Select the StatelessNiFiSinkConnector from the Sink Templates tab if your dataflow delivers (sinks) Kafka data into other systems.
  5. Enter a name for the connector in the Enter Name field.
    Ensure that you add a unique and easily identifiable name. The name of the connector cannot be changed once the connector is deployed.
  6. Configure the connector
    Connector configuration can be broken into three steps. First, you configure the properties available in the default configuration template. Second, you configure other connector properties. Finally, you specify the flow parameters specific to your dataflow. These are the parameters that you set up in the parameter context when you built the flow in NiFi. Even though the following breaks the process up into multiple substeps, all properties are configured on the same page in the SMM UI.
    1. Configure the properties available in the default template.
      The following properties from the default template are the ones that you should configure. Other properties in the template that are not highlighted here have working default values configured. Configuring the properties that are present in the template but not highlighted in the following list is not recommended by Cloudera.
      flow.snapshot
      Specifies the dataflow to run. You have multiple options on how to specify the value of this property. You can copy and paste the contents of the flow definition, upload the flow definition, or reference a file path. For more information on flow.snapshot configuration, see Configuring flow.snapshot for Stateless NiFi connectors.
      input.port
      The name of the input port in the NiFi dataflow that Kafka records are sent to by the connector’s Kafka consumer. This property is specific to the Stateless NiFi Sink connector and is a port in NiFi terminology, not a network port. If the dataflow contains exactly one input port, this property is optional and can be omitted. However, if the dataflow contains multiple Input Ports, this property must be specified.
      output.port
      The name of the output port in the NiFi dataflow that is used to deliver the messages from the dataflow to the connector’s Kafka producer. This property is specific to the Stateless NiFi Source connector and is a port in NiFi terminology, and not a network port. If the dataflow contains exactly one port, this can be omitted. However, if the dataflow contains multiple ports (for example, a success and a failure port), this property must be specified. If any flowfile is sent to any port other than the specified port, it is considered as a failure. The session is rolled back and no data is collected.
      topics
      For source connectors, this is the name of the topic to deliver data to. For sink connectors, this can be a comma-separated list of topics to fetch data from.
      krb5.file
      Specifies the krb5.conf file to use if the dataflow interacts with any services that are secured using Kerberos. This property is optional and is only required if a connection is established with a Kerberized service. Use the default value if an appropriate krb5.conf is located in /etc.
    2. Add other connector properties.
      There are many other properties that you can set. These include the properties specific to the Stateless NiFi connectors as well as the Kafka Connect properties that come from the base framework. Which of these you require depends on your dataflow and use case.
      In Stateless NiFi connectors, the Kafka message's key is always a string, its value is always a byte array. As a result, the key and value converters of the connector should not be modified. Use the default values, which are as follows:
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
      
      A comprehensive list of all other properties that the connectors accept can be found in Stateless NiFi Source connector reference, Stateless NiFi Sink Connector reference, and the Apache Kafka documentation.
    3. Configure dataflow-specific parameters.
      If you followed the recommendations provided in Dataflow development best practices for Stateless NiFi and Developing a dataflow for Stateless NiFi, your dataflow is parameterized. The parameters that you defined within the dataflow can be assigned values using the configuration pane in SMM. This is done by adding a configuration entry for each of the dataflow parameters that you want to configure. Dataflow parameters can be added to the configuration as follows:
      "parameter.[***PARAMETER NAME***]": "[***VALUE***]"
      For example, assume that you have a the following configuration entry:
      "parameter.Directory": "/mydir"
      In a case like this, any parameter context in the dataflow that has a parameter named Directory gets the specified value (/mydir). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter named Directory.
      You can also apply a parameter to a specific parameter context. This is done by prefixing the parameter name with the name of the parameter context followed by a colon.
      "parameter.[***PARAMETER CONTEXT NAME***]:[***PARAMETER NAME***]": "[***VALUE***]"
      For example, assume you have the following configuration entry:
      "parameter.My Context:Directory": "/mydir"

      In a case like this, only the parameter context called My Context gets the specified value for the Directory parameter.

      Once all properties are configured, your configuration should look similar to the following example:
      {
       "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
       "flow.snapshot": "[***FLOW DEFINITION JSON***]",
       "key.converter": "org.apache.kafka.connect.storage.StringConverter",
       "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
       "tasks.max": "1",
       "nexus.url": "https://repository.cloudera.com/artifactory/repo",
       "extensions.directory": "/tmp/nifi-stateless-extensions",
       "working.directory": "/tmp/nifi-stateless-working",
       "topics": "[***KAFKA TOPIC NAME***]",
       "parameter.MQTT Source Parameters:MQTT Broker URI": "tcp://[***HOST***]:[***PORT***]",
       "parameter.MQTT Source Parameters:MQTT Quality of Service": "0",
       "parameter.MQTT Source Parameters:MQTT Topics": "[***MQTT TOPIC NAME***]"
      }
      

      This example is for a custom built MQTT source connector. The example uses additional Connect properties such as key.converter and value.converter. The example also specifies a number of flow parameters such as MQTT Broker URI and MQTT Topics.

    4. Optional: Configure Kafka producer and consumer properties.
      Source connectors use internal Kafka producers to write message to topic, sink connectors use internal consumers to fetch messages from topics. The properties of these producers and consumers can be modified on a per-connector basis. This is done by adding the required properties with the producer.override and consumer.override prefixes to the connector configuration.
      For example, if you want to set the consumer of your sink connector to use auto.offset.reset=earliert, you need to add the following configuration entry to the configuration of the connector.
      consumer.override.auto.offset.reset=earliest
  7. Click Validate.
    The validator displays any JSON errors in your configuration. Fix any errors that are displayed. If your JSON is valid, the JSON is valid message is displayed in the validator.
  8. Click Next.
  9. Review your connector configuration.
  10. Click Deploy.