JDBC Sink connector

The JDBC Sink connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.

The JDBC Sink connector fetches messages from Kafka and loads them into a database table. The topic this connector receives messages from is determined by the value of the topics property in the configuration. The messages the connector receives from Kafka can be in either Avro or JSON format and must contain records that can be inserted into the database table. The schema of the records can be a predefined schema retrieved from Schema Registry (for both Avro and JSON data), it can be embedded in the Avro data, or inferred from the JSON data. The strategy that is used to retrieve the schema is determined by the Schema Access Strategy property.

Properties and configuration

Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:

Common connector properties
These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
Stateless NiFi Sink properties
These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink property reference.
Connector/dataflow-specific properties
These properties are unique to this specific connector. Or to be more precise, unique to the dataflow running within the connector. These properties use the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:
For a comprehensive list of these properties, see the JDBC Sink properties reference.

Notes and limitations

  • Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
  • If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
  • Properties not marked as required must be completely removed from the configuration JSON if not set.
  • Schema Branch and Schema Version can not be specified at the same time.

Configuration example

In this example, the connector fetches messages in JSON format from Kafka, translates it to a SQL INSERT statement in order to load the data into a PostgreSQL database. The schema of the records must match the schema of the database table. The record schema in this example is inferred from the input JSON.

{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "JDBC Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "failure.ports": "Retry from PutDatabaseRecord",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.JDBC Sink Parameters:Database Connection URL": "[***JDBC URL***]",
 "parameter.JDBC Sink Parameters:Database Driver Location": "[***PATH TO JDBC DRIVER***]",
 "parameter.JDBC Sink Parameters:Database Driver Class Name": "org.postgresql.Driver",
 "parameter.JDBC Sink Parameters:Database Type": "PostgreSQL",
 "parameter.JDBC Sink Parameters:Database User Name": "[***USERNAME***]",
 "parameter.JDBC Sink Parameters:Database User Password": "[***PASSWORD***]",
 "parameter.JDBC Sink Parameters:Database Table Name": "[***TABLE NAME***]",
 "parameter.JDBC Sink Parameters:Kafka Message Data Format": "JSON",
 "parameter.JDBC Sink Parameters:Schema Access Strategy": "Infer Schema"
}
The following list collects the properties from the configuration example that must be customized for this use case.
topics
The name of the Kafka topic the connector fetches messages from.
Database Connection URL
The JDBC URL of the PostgreSQL database. For example, jdbc:postgresql://myhost:5432/mydb.
Database Driver Location
The path to the PostgreSQL JDBC driver JAR file (or the directory containing the JAR).
Database Driver Class Name
The Java class name of the PostgreSQL Driver implementation.
Database Type
The type of the database. Because this is a PostgreSQL example, this property is set to PostgreSQL.
Database User Name
The username used for authenticating to the database.
Database User Password
The name of the database table to load data to.
Kafka Message Data Format
The format of the messages the connector receives from Kafka.
Schema Access Strategy
Specifies the strategy used for determining the schema of the Kafka record. In this example, the property is set to Infer Schema. This means that the schema is determined (inferred) from the JSON data.