Kudu Sink connector

The Kudu Sink connector is a NiFi Stateless dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.

The Kudu Sink connector fetches messages from Kafka and loads them into a table in Kudu. The topic this connector receives messages from is determined by the value of the topics property in the configuration. The messages the connector receives from Kafka can be in either Avro or JSON format and must contain records that can be inserted into a Kudu table. If the connector input is in Avro format, then it can either read the schema from the Avro file it receives (provided that the schema is embedded) or it can fetch the schema from Schema Registry. If the connector’s input is in JSON format, then it can either infer the schema, or fetch it from Schema Registry. The strategy that is used to retrieve the schema is determined by the Schema Access Strategy property.

Kudu is expected to be on a Kerberized cluster. Additionally, Schema Registry should be on the same cluster as Kudu. In the connector’s configuration the krb5.file property must point to the krb5.conf file that provides access to the cluster that Kudu and Schema Registry are on. The keytab and truststore files that are necessary to access Kudu and Schema Registry must be present on the cluster node that the connector runs on.

Properties and configuration

Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:

Common connector properties
These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
Stateless NiFi Sink properties
These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink property reference.
Connector/dataflow specific properties
These properties are unique to this specific connector. Or to be more precise, unique to the dataflow running within the connector. These properties use the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:
For a comprehensive list of these properties, see the Kudu Sink properties reference.

Notes and limitations

  • Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
  • If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
  • Properties not marked as required must be completely removed from the configuration JSON if not set.
  • Schema Registry URL is mandatory even if no registry is used. In a case like this, use the default value, or remove the property completely from the configuration JSON. Alternatively, if your input data is JSON, you can use the Infer Schema value to infer the schema from the input, or if your input data is Avro, you can use the Embedded Schema value to read the schema embedded in the input.
  • Truststore parameters are mandatory even if connecting to a non-secure Schema Registry or no registry is used. In that case the default (empty) truststore can be used.
  • Kerberos parameters for Schema Registry are mandatory even if connecting to a non-secure Schema Registry or no registry is used. In that case the default (empty) keytab can be used.
  • Kudu is expected to be on a Kerberized cluster. As a result, Kerberos parameters for Kudu are mandatory.

Configuration example

In this example, the connector receives data in JSON format from a Kafka topic and forwards it to a Kudu table. The record schema is inferred based on the data.
{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "Kudu Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "input.port": "Input from Kafka",
 "failure.ports": "Failure",
 "krb5.file": "[***PATH TO KRB5 FILE***]",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.Kudu Sink Parameters:Kafka Message Data Format": "JSON",
 "parameter.Kudu Sink Parameters:Kerberos Keytab for Kudu": "[***PATH TO KUDU KEYTAB***]",
 "parameter.Kudu Sink Parameters:Kerberos Principal for Kudu": "[***KUDU PRINCIPAL***]",
 "parameter.Kudu Sink Parameters:Kudu Masters": "[***KUDU MASTER 1 WITH PORT***],[***KUDU MASTER 2 WITH PORT***],[***KUDU MASTER 3 WITH PORT***]",
 "parameter.Kudu Sink Parameters:Schema Access Strategy": "Infer Schema",
 "parameter.Kudu Sink Parameters:Table Name With Schema": "[***KUDU SCHEMA***].[***KUDU TABLE***]"
}
The following list collects the properties from the configuration example that must be customized for this use case.
krb5.file
The path to a krb5.conf file that contains the information needed to access the Kudu service on a Kerberized cluster
topics
The name of the Kafka topic that the connector fetches messages from.
Kafka Message Data Format
The format of the messages the connector receives from Kafka. In this example, this property is set to JSON. This means that the connector expects data in JSON format.
Kerberos Keytab for Kudu
The path to the keytab file that enables access to the Kudu service on a Kerberized cluster.
Kerberos Principal for Kudu
The principal used to access Kudu
Kudu Masters
A comma-separated list of server URLs that identify the Kudu service’s master nodes. For example: localhost:7051,localhost:7151,localhost:7251
Schema Access Strategy
Specifies the strategy used for determining the schema of the Kafka record. In this example, the property is set to Infer Schema. This means that the record schema is inferred based on the data the connector receives.
Table Name With Schema
The schema and name of the Kudu table in which the records get inserted.