Kudu Sink connector
The Kudu Sink connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.
The Kudu Sink connector fetches messages from Kafka and loads them into a table in Kudu. The
topic this connector receives messages from is determined by the value of the
topics
property in the configuration. The messages the connector receives
from Kafka can be in either Avro or JSON format and must contain records that can be inserted
into a Kudu table. If the connector input is in Avro format, then it can either read the
schema from the Avro file it receives (provided that the schema is embedded) or it can fetch
the schema from Schema Registry. If the connector’s input is in JSON format, then it can
either infer the schema, or fetch it from Schema Registry. The strategy that is used to
retrieve the schema is determined by the Schema Access Strategy
property.
Kudu is expected to be on a Kerberized cluster. Additionally, Schema Registry should be on
the same cluster as Kudu. In the connector’s configuration the krb5.file
property must point to the krb5.conf
file that provides access to the cluster
that Kudu and Schema Registry are on. The keytab and truststore files that are necessary to
access Kudu and Schema Registry must be present on the cluster node that the connector runs
on.
Properties and configuration
Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Sink properties
- These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink property reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
to the dataflow running within the connector. These properties use the following
prefix:
For a comprehensive list of these properties, see the Kudu Sink properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
- If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration JSON if not set.
Schema Branch
andSchema Version
can not be specified at the same time.Schema Registry URL
is mandatory even if no registry is used. In a case like this, use the default value, or remove the property completely from the configuration JSON. Alternatively, if your input data is JSON, you can use theInfer Schema
value to infer the schema from the input, or if your input data is Avro, you can use theEmbedded Schema
value to read the schema embedded in the input.- Truststore parameters are mandatory even if connecting to a non-secure Schema Registry or no registry is used. In that case the default (empty) truststore can be used.
- Kerberos parameters for Schema Registry are mandatory even if connecting to a non-secure Schema Registry or no registry is used. In that case the default (empty) keytab can be used.
- Kudu is expected to be on a Kerberized cluster. As a result, Kerberos parameters for Kudu are mandatory.
Configuration example
{
"connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
"meta.smm.predefined.flow.name": "Kudu Sink",
"meta.smm.predefined.flow.version": "1.0.0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1",
"nexus.url": "https://repository.cloudera.com/artifactory/repo",
"extensions.directory": "/tmp/nifi-stateless-extensions",
"working.directory": "/tmp/nifi-stateless-working",
"input.port": "Input from Kafka",
"failure.ports": "Failure",
"krb5.file": "[***PATH TO KRB5 FILE***]",
"topics": "[***KAFKA TOPIC NAME***]",
"parameter.Kudu Sink Parameters:Kafka Message Data Format": "JSON",
"parameter.Kudu Sink Parameters:Kerberos Keytab for Kudu": "[***PATH TO KUDU KEYTAB***]",
"parameter.Kudu Sink Parameters:Kerberos Principal for Kudu": "[***KUDU PRINCIPAL***]",
"parameter.Kudu Sink Parameters:Kudu Masters": "[***KUDU MASTER 1 WITH PORT***],[***KUDU MASTER 2 WITH PORT***],[***KUDU MASTER 3 WITH PORT***]",
"parameter.Kudu Sink Parameters:Schema Access Strategy": "Infer Schema",
"parameter.Kudu Sink Parameters:Table Name With Schema": "[***KUDU SCHEMA***].[***KUDU TABLE***]"
}
krb5.file
- The path to a
krb5.conf
file that contains the information needed to access the Kudu service on a Kerberized cluster topics
- The name of the Kafka topic that the connector fetches messages from.
Kafka Message Data Format
- The format of the messages the connector receives from Kafka. In this example, this
property is set to
JSON
. This means that the connector expects data in JSON format. Kerberos Keytab for Kudu
- The path to the keytab file that enables access to the Kudu service on a Kerberized cluster.
Kerberos Principal for Kudu
- The principal used to access Kudu
Kudu Masters
- A comma-separated list of server URLs that identify the Kudu service’s master nodes.
For example:
localhost:7051,localhost:7151,localhost:7251
Schema Access Strategy
- Specifies the strategy used for determining the schema of the Kafka record. In this
example, the property is set to
Infer Schema
. This means that the record schema is inferred based on the data the connector receives. Table Name With Schema
- The schema and name of the Kudu table in which the records get inserted.