Kudu Sink connector
The Kudu Sink connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.
The Kudu Sink connector fetches messages from Kafka and loads them into a table in Kudu. The
      topic this connector receives messages from is determined by the value of the
        topics property in the configuration. The messages the connector receives
      from Kafka can be in either Avro or JSON format and must contain records that can be inserted
      into a Kudu table. If the connector input is in Avro format, then it can either read the
      schema from the Avro file it receives (provided that the schema is embedded) or it can fetch
      the schema from Schema Registry. If the connector’s input is in JSON format, then it can
      either infer the schema, or fetch it from Schema Registry. The strategy that is used to
      retrieve the schema is determined by the Schema Access Strategy property.
Kudu is expected to be on a Kerberized cluster. It is not mandatory to have Kudu and Schema
      Registry on the same cluster. In the connector’s configuration the krb5.file
      property must point to the krb5.conf file that provides access to the cluster
      that Kudu and Schema Registry are on. The keytab and truststore files that are necessary to
      access Kudu and Schema Registry must be present on the cluster node that the connector runs
      on.
Properties and configuration
Configuration is passed to the connector during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Sink properties
- These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink properties reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
            to the dataflow running within the connector. These properties use the following
            prefix:
 For a comprehensive list of these properties, see the Kudu Sink properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration.
- If a property that has a default value is completely removed from the configuration, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration if not set.
- Schema Branchand- Schema Versioncan not be specified at the same time.
- Schema Registry URLis mandatory even if no registry is used. In a case like this, use the default value, or remove the property completely from the configuration.
- Truststore parameters are mandatory even if connecting to a non-secure Schema Registry or no registry is used. In that case the default (empty) truststore can be used.
- Kerberos parameters for Schema Registry are mandatory even if connecting to a non-secure Schema Registry or no registry is used. In that case the default (empty) keytab can be used.
- Kudu is expected to be on a Kerberized cluster. As a result, Kerberos parameters for Kudu are mandatory.
Configuration example
{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "Kudu Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "input.port": "Input from Kafka",
 "failure.ports": "Failure",
 "krb5.file": "[***PATH TO KRB5 FILE***]",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.Kudu Sink Parameters:Kafka Message Data Format": "JSON",
 "parameter.Kudu Sink Parameters:Kerberos Keytab for Kudu": "[***PATH TO KUDU KEYTAB***]",
 "parameter.Kudu Sink Parameters:Kerberos Principal for Kudu": "[***KUDU PRINCIPAL***]",
 "parameter.Kudu Sink Parameters:Kudu Masters": "[***KUDU MASTER 1 WITH PORT***],[***KUDU MASTER 2 WITH PORT***],[***KUDU MASTER 3 WITH PORT***]",
 "parameter.Kudu Sink Parameters:Schema Access Strategy": "Infer Schema",
 "parameter.Kudu Sink Parameters:Table Name With Schema": "[***KUDU SCHEMA***].[***KUDU TABLE***]"
}
- krb5.file
- The path to a krb5.conffile that contains the information needed to access the Kudu service on a Kerberized cluster
- topics
- The name of the Kafka topic that the connector fetches messages from.
- Kafka Message Data Format
- The format of the messages the connector receives from Kafka. In this example, this
              property is set to JSON. This means that the connector expects data in JSON format.
- Kerberos Keytab for Kudu
- The path to the keytab file that enables access to the Kudu service on a Kerberized cluster.
- Kerberos Principal for Kudu
- The principal used to access Kudu
- Kudu Masters
- A comma-separated list of server URLs that identify the Kudu service’s master nodes.
              For example: localhost:7051,localhost:7151,localhost:7251
- Schema Access Strategy
- Specifies the strategy used for determining the schema of the Kafka record. In this
              example, the property is set to Infer Schema. This means that the record schema is inferred based on the data the connector receives.
- Table Name With Schema
- The schema and name of the Kudu table in which the records get inserted.
Stateless NiFi Sink properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Sink connector.
In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.
attribute.prefix
- Description
- The prefix to add to the key of each header that matches the regular expression
            specified in headers.as.attributes.regex. For example, if the header key isMyHeader, its value isMyValue,headers.as.attributes.regexis set toMy.*, and this property is set tokafka, the flowfile that is created for the Kafka message will have an attribute namedkafka.MyHeaderwith a value ofMyValue.
- Default Value
- Accepted Values
- Required
- false
dataflow.timeout
- Description
- Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
- Default Value
- 60 seconds
- Accepted Values
- Required
- false
extensions.directory
- Description
- Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
- Default Value
- /tmp/nifi-stateless-extensions
- Accepted Values
- Required
- true
failure.ports
- Description
- A comma separated list of output ports that are considered as failure conditions. If
            any flowfile is routed to an output port specified in this property, the dataflow is
            considered a failure and the session is rolled back. After a set amount of time, the
            dataflow reattempts to process the Kafka record. Any data transferred to an output port
            that is not in the list of failure ports is discarded. Because of how Stateless NiFi Sink connectors behave, even if a single flowfile ends up in an output port that is marked as failure, the entire sessions is rolled back with all messages in the batch. Furthermore, if a flowfile ends up in a failure port in each subsequent iteration, the result is an endless loop. With some sink connectors (for example. MQTT Sink) this is the desired behavior. For more information regarding this behavior, see Dataflow execution and scheduling. 
- Default Value
- Accepted Values
- Required
- false
flow.snapshot
- Description
- Specifies the dataflow to run. When using SMM to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using SMM. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
- Default Value
- Accepted Values
- Required
- true
headers.as.attributes.regex
- Description
- A Java regular expression that is evaluated against all Kafka record headers. Headers
            are added to the flowfile as an attribute if the header key matches the regular
            expression. The header key is used as the attribute name. The header value is used as
            the attribute value. Additionally, the name of the attribute can also contain an
            optional prefix which is defined by the attribute.prefixproperty.
- Default Value
- Accepted Values
- Required
- false
input.port
- Description
- The name of the input port in the NiFi dataflow that Kafka records are sent to. If the dataflow contains exactly one input port, this property is optional and can be omitted. However, if the dataflow contains multiple input ports, this property must be specified.
- Default Value
- Accepted Values
- Required
- false
krb5.file
- Description
- Specifies the krb5.conffile to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to/etc/krb5.confif not specified.
- Default Value
- /etc/krb5.conf
- Accepted Values
- Required
- false
name
- Description
- The name of the connector. On the SMM UI, the connector names are specified using the Enter Name field. The name that you enter in the Enter Name field is automatically set as the value of the name property when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in SMM. If you manually add the name property to the configuration in SMM, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy.
- Default Value
- Accepted Values
- Required
- true
nexus.url
- Description
- Specifies the Base URL of the Nexus instance to source extensions from. If configuring
            a Nexus instance that has multiple repositories, include the name of the repository in
            the URL. For example,
              https://nexus-private.myorganization.org/nexus/repository/my-repository/. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector.
- Default Value
- Accepted Values
- Required
- true
parameter.[***FLOW PARAMETER NAME***]
- Description
- Specifies a parameter to use in the dataflow. For example, assume that you have the
            following entry in your connector configuration "parameter.Directory": "/mydir".In a case like this, any parameter context in the dataflow that has a parameter namedDirectorygets the specified value (/mydir). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter namedDirectory. Parameters can also be applied to specific parameter contexts only. This can be done by prefixing the parameter name (Directory) with the name of the parameter context followed by a colon. For example,parameter.My Context:Directoryonly applies the specified value for theDirectoryparameter in the Parameter Context named My Context.
- Default Value
- Accepted Values
- Required
- false
working.directory
- Description
- Specifies a directory on the Connect server that NiFi should use for unpacking
            extensions that it needs to perform the dataflow. The contents of
              extensions.directoryare unpacked here. Defaults to/tmp/nifi-stateless-workingif not specified.
- Default Value
- /tmp/nifi-stateless-working
- Accepted Values
- Required
- false
Kudu Sink properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the Kudu Sink connector.
parameter.[***CONNECTOR NAME***] Parameters:In addition to the properties listed here, this connector also accepts certain properties of the Kafka Connect framework as well as the properties of the NiFi Stateless Sink connector. When creating a new connector using the UI, all valid properties are presented in the default configuration template. You can view the configuration template to get a full list of valid properties. In addition, for more information regarding the accepted properties not listed here, you can review the Apache Kafka documentation and the Stateless NiFi Sink properties reference.
Date Format
- Description
- Specifies the format to use when reading Date fields from JSON.
- Default Value
- yyyy-MM-dd
- Accepted Values
- Required
- true
Handle Schema Drift
- Description
- Specifies whether to handle Schema Drift. If set to true, when fields with names that are not in the target Kudu table are encountered, the Kudu table is altered to include new columns for those fields. If set to false, fields that only exist in the input data are skipped.
- Default Value
- false
- Accepted Values
- true, false
- Required
- true
Ignore Null
- Description
- Ignore NULL on Kudu Put Operation. If set to true, only non-null columns get updated.
- Default Value
- false
- Accepted Values
- true, false
- Required
- true
Kafka Message Data Format
- Description
- Specifies the format of the messages the connector receives from Kafka.
- Default Value
- AVRO
- Accepted Values
- AVRO, JSON
- Required
- true
Kerberos Keytab for Kudu
- Description
- The fully-qualified filename of the kerberos keytab associated with the principal for accessing Kudu.
- Default Value
- The location of the default keytab which is empty and can only be used for unsecure connections
- Accepted Values
- Required
- true
Kerberos Keytab for Schema Registry
- Description
- The fully-qualified filename of the kerberos keytab associated with the principal for accessing Schema Registry.
- Default Value
- The location of the default keytab which is empty and can only be used for unsecure connections.
- Accepted Values
- Required
- true
Kerberos Principal for Kudu
- Description
- The Kerberos principal used for authenticating to Kudu.
- Default Value
- default
- Accepted Values
- Required
- true
Kerberos Principal for Schema Registry
- Description
- The Kerberos principal used for authenticating to Schema Registry.
- Default Value
- default
- Accepted Values
- Required
- true
Kudu Kerberos Service Name
- Description
- The SASL protocol name to use for authenticating with Kerberos. Must match the Kudu service principal name.
- Default Value
- kudu
- Accepted Values
- Required
- false
Kudu Masters
- Description
- The comma separated addresses of the Kudu masters to connect to. For example:
            localhost: 7051,localhost:7151,localhost:7251.
- Default Value
- Accepted Values
- Required
- true
Kudu Operation Type
- Description
- Specifies what Kudu operation gets executed.
- Default Value
- INSERT
- Accepted Values
- INSERT, INSERT_IGNORE, UPSERT, UPDATE, DELETE, UPDATE_IGNORE, DELETE_IGNORE
- Required
- true
Lowercase Field Names
- Description
- Specifies whether to convert column names to lowercase when finding indexes of Kudu table columns.
- Default Value
- false
- Accepted Values
- true, false
- Required
- true
Schema Access Strategy
- Description
- Specifies the strategy used for determining the schema of the Kafka record. The value
            you set here depends on the data format set in Kafka Message Data Format.- If set to Schema Registry, the schema is read from Schema Registry. This setting can be used with both Avro and JSON formats.
- If set to Infer Schema, the schema is inferred based on the input file. This setting can only be used ifKafka Message Data FormatisJSON.
- If set to Embedded Schema, the schema embedded in the input is used. This setting can only be used ifKafka Message Data FormatisAvro.
- If set to HWX Content-Encoded Schema Reference, the schema is read from Schema Registry. This setting can only be used ifKafka Message Data FormatisAvro. In this case the Avro messages are expected to have a reference to the schema in Schema Registry encoded within the message content.
 
- If set to 
- Default Value
- Schema Registry
- Accepted Values
- Schema Registry, Infer Schema, Embedded Schema, HWX Content-Encoded Schema Reference
- Required
- true
Schema Branch
- Description
- The name of the branch to use when looking up the schema in Schema Registry.
              Schema BranchandSchema Versioncannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration.
- Default Value
- Accepted Values
- Required
- false
Schema Name
- Description
- The schema name to look up in Schema Registry. If the Schema Access Strategyproperty is set toSchema Registry, this property must contain a valid schema name. If Schema Registry is not used, this property must be completely removed from the configuration.
- Default Value
- Accepted Values
- Required
- false
Schema Registry URL
- Description
- The URL of the Schema Registry server. If Schema Registry is not used, use the default value.
- Default Value
- http://localhost:7788/api/v1
- Accepted Values
- Required
- true
Schema Version
- Description
- The version of the schema to look up in Schema Registry. If Schema Registry is used
            and a schema version is not specified, the latest version of the schema is retrieved.
              Schema BranchandSchema Versioncannot be specified at the same time. If one is specified, the other needs to be removed from the configuration. If Schema Registry is not used, this property must be completely removed from the configuration.
- Default Value
- Accepted Values
- Required
- false
Table Name With Schema
- Description
- The schema and name of the Kudu Table to put data into. For example:
              default.mytable. When using Impala:impala::default.mytable.
- Default Value
- Accepted Values
- Required
- true
Time Format
- Description
- Specifies the format to use when reading Time fields from JSON.
- Default Value
- HH:mm:ss
- Accepted Values
- Required
- true
Timestamp Format
- Description
- Specifies the format to use when reading Timestamp fields from JSON. The value must
            match the Java Simple Date Format. (For example, MM/dd/yyyy HH:mm:ss for a two-digit
            month, followed by a two-digit day, followed by a four-digit year, all separated by
            forward slashes. This is followed by a two-digit hour in 24-hour format, followed by a
            two-digit minute, followed by a two-digit second, all separated by colons. The resulting
            timestamp looks like this: 01/01/2017 18:04:15.)
- Default Value
- yyyy-MM-dd HH:mm:ss.SSS
- Accepted Values
- Required
- true
Truststore Filename for Schema Registry
- Description
- The fully-qualified filename of a truststore. This truststore is used to establish a secure connection with Schema Registry using TLS.
- Default Value
- The location of the default truststore which is empty and can only be used for unsecure connections
- Accepted Values
- Required
- true
Truststore Password for Schema Registry
- Description
- The password used to access the contents of the truststore configured in the
              Truststore Filename for Schema Registryproperty.
- Default Value
- password
- Accepted Values
- Required
- true
Truststore Type for Schema Registry
- Description
- The type of the truststore configured in the Truststore Filename for Schema Registryproperty.
- Default Value
- Accepted Values
- BCFKS, PKCS12, JKS
- Required
- true
