A custom developed NiFi dataflow can be deployed with the Stateless NiFi Source or Sink
connectors using Streams Messaging Manager (SMM).
After building and downloading your dataflow, you can deploy it in Kafka Connect as a
source or sink connector. This can be done using SMM. The following list of steps walk you
through the process of deploying a Stateless NiFi Sink or Source connector and provide
examples on how dataflows are configured using SMM.
These steps are for the SMM UI, however, the actions described here can also be completed
using the SMM REST API. For more information, see Cloudera Streams Messaging Manager
REST API Reference.
A CDP cluster containing SMM and Kafka is available.
You have access to the SMM UI.
The flow definition of the dataflow that you want to deploy is available to you. For
more information on how to design, build, and export a dataflow using the NiFi UI, see
Developing a dataflow for Stateless NiFi .
The properties of Kafka consumers and producers used internally by the connector
(Connect-managed clients) can be configured on a per-connector basis. The following steps
give an example of how this can be done. However, in order for consumer and producer
property overrides to take effect, you must ensure that the Connector Kafka
Client Configuration Override Policy Kafka service property is set to
All in Cloudera Manager. By default this property is set to
None, which means that no overrides are allowed.
Access the SMM UI.
Click (Connect) on the
navigation sidebar.
Click + New Connector to add a new connector.
Select a connector.
Which connector you select depends on the type of
dataflow you want to run. Select StatelessNiFiSourceConnector found
on the Source Templates tab if your dataflow collects (sources) data
from systems and publishes it to Kafka. Select the
StatelessNiFiSinkConnector from the Sink Templates
tab if your dataflow delivers (sinks) Kafka data into other
systems.
Enter a name for the connector in the Enter Name
field.
Ensure that you add a unique and easily identifiable name. The name of
the connector cannot be changed once the connector is deployed.
Configure the connector
Connector configuration can be broken into three steps. First, you configure the properties
available in the default configuration template. Second, you configure other connector
properties. Finally, you specify the flow parameters specific to your dataflow. These are
the parameters that you set up in the parameter context when you built the flow in NiFi.
Even though the following breaks the process up into multiple substeps, all properties are
configured on the same page in the SMM UI.
Configure the properties available in the default template.
The
following properties from the default template are the ones that you should configure.
Other properties in the template that are not highlighted here have working default
values configured. Configuring the properties that are present in the template but not
highlighted in the following list is not recommended by Cloudera.
flow.snapshot
Specifies the dataflow to run. You have multiple options on how to specify
the value of this property. You can copy and paste the contents of the flow
definition, upload the flow definition, or reference a file path. For more
information on flow.snapshot configuration, see
Configuring flow.snapshot for Stateless NiFi connectors.
input.port
The name of the input port in the NiFi dataflow that Kafka records are sent to
by the connector’s Kafka consumer. This property is specific to the Stateless
NiFi Sink connector and is a port in NiFi terminology, not a network port. If
the dataflow contains exactly one input port, this property is optional and can
be omitted. However, if the dataflow contains multiple Input Ports, this
property must be specified.
output.port
The name of the output port in the NiFi dataflow that is used to deliver the
messages from the dataflow to the connector’s Kafka producer. This property is
specific to the Stateless NiFi Source connector and is a port in NiFi
terminology, and not a network port. If the dataflow contains exactly one port,
this can be omitted. However, if the dataflow contains multiple ports (for
example, a success and a failure port), this property must be specified. If any
flowfile is sent to any port other than the specified port, it is considered as
a failure. The session is rolled back and no data is collected.
topics
For source connectors, this is the name of the topic to deliver data to. For
sink connectors, this can be a comma-separated list of topics to fetch data
from.
krb5.file
Specifies the krb5.conf file to use if the dataflow interacts with any
services that are secured using Kerberos. This property is optional and is only
required if a connection is established with a Kerberized service. Use the
default value if an appropriate krb5.conf is located in
/etc.
Add other connector properties.
There are many other properties that you can set. These include the properties specific to the
Stateless NiFi connectors as well as the Kafka Connect properties that come from the
base framework. Which of these you require depends on your dataflow and use case.
In Stateless NiFi connectors, the Kafka message's key is always a string, its
value is always a byte array. As a result, the key and value converters of the
connector should not be modified. Use the default values, which are as follows:
A comprehensive list of all other properties that the connectors
accept can be found in Stateless NiFi Source connector reference,
Stateless NiFi Sink Connector reference, and the Apache Kafka
documentation.
Configure dataflow-specific parameters.
If you followed the recommendations provided in Dataflow development best
practices for Stateless NiFi and Developing a dataflow for Stateless
NiFi, your dataflow is parameterized. The parameters that you defined
within the dataflow can be assigned values using the configuration pane in SMM. This
is done by adding a configuration entry for each of the dataflow parameters that you
want to configure. Dataflow parameters can be added to the configuration as
follows:
For example, assume that you have a the following configuration
entry:
"parameter.Directory": "/mydir"
In a case like this, any
parameter context in the dataflow that has a parameter named
Directory gets the specified value (/mydir). If
the dataflow has child process groups, and those child process groups have their own
parameter contexts, the value is used for all parameter contexts that contain a
parameter named Directory.
You can also apply a parameter to a specific parameter context. This is done by
prefixing the parameter name with the name of the parameter context followed by a
colon.
This
example is for a custom built MQTT source connector. The example uses additional
Connect properties such as key.converter and
value.converter. The example also specifies a number of flow
parameters such as MQTT Broker URI and MQTT
Topics.
Optional: Configure Kafka producer and consumer properties.
Source connectors use internal Kafka producers to write message to topic, sink
connectors use internal consumers to fetch messages from topics. The properties of
these producers and consumers can be modified on a per-connector basis. This is done
by adding the required properties with the producer.override and
consumer.override prefixes to the connector configuration.
For example, if you want to set the consumer of your sink connector to use
auto.offset.reset=earliert, you need to add the following
configuration entry to the configuration of the
connector.
consumer.override.auto.offset.reset=earliest
Click Validate.
The validator displays any JSON
errors in your configuration. Fix any errors that are displayed. If your JSON is valid,
the JSON is valid message is displayed in the
validator.