JDBC Source connector

The JDBC Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.

The JDBC Source connector reads records from a database table and transfers each record to Kafka in Avro or JSON format. The Kafka topic this connector transfers messages to is determined by the value of the topics parameter in the configuration. The schema of the records can be inherited from the schema of the database table, or it can be a predefined schema retrieved from Schema Registry.

The connector supports incremental loading based on a table column containing increasing values (typically an ID sequence or timestamp). When the connector is started for the first time with incremental loading turned on, it can load all the existing data from the source table, or it can start from the current maximum value of the incrementing column.

Properties and configuration

Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:

Common connector properties
These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
Stateless NiFi Source properties
These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
Connector/dataflow-specific properties
These are the properties that are unique to this specific connector. Or to be more precise, unique to the dataflow running within the connector. These properties use the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:
For a comprehensive list of these properties, see the JDBC Source properties reference.

Notes and limitations

  • Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
  • If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
  • Properties not marked as required must be completely removed from the configuration JSON if not set.
  • The tasks.max property must be set to 1. Setting the property to any other value will not have an effect. This is because this connector performs incremental loading and must run as a single task.
  • If Initial Load Strategy is set to Start at Beginning and “Maximum-value Columns is not set, the connector tries to load the entire content of the source database table to the target Kafka topic with every execution of the connector. Set Maximum-value Columns to avoid this.

Configuration example

In this example, the connector connects to a PostgreSQL database using username/password authentication, queries the records from a table, converts each record to JSON format, and sends them to a Kafka topic. The schema of the records is determined from the schema of the database table. The data loading is incremental, only new records are loaded based on a column containing incrementing values in the table.

{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
 "meta.smm.predefined.flow.name": "JDBC Source",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "key.attribute": "kafka.message.key",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.JDBC Source Parameters:Database Connection URL": "[***JDBC URL***]",
 "parameter.JDBC Source Parameters:Database Driver Location": "[***PATH TO JDBC DRIVER***]",
 "parameter.JDBC Source Parameters:Database Driver Class Name": "org.postgresql.Driver",
 "parameter.JDBC Source Parameters:Database Type": "PostgreSQL",
 "parameter.JDBC Source Parameters:Database User Name": "[***USERNAME***]",
 "parameter.JDBC Source Parameters:Database User Password": "[***PASSWORD***]",
 "parameter.JDBC Source Parameters:Database Table Name": "[***TABLE NAME***]",
 "parameter.JDBC Source Parameters:Kafka Message Key Column": "[***COLUMN NAME***]",
 "parameter.JDBC Source Parameters:Maximum-value Columns": "[***COLUMN NAME***]",
 "parameter.JDBC Source Parameters:Kafka Message Data Format": "JSON",
 "parameter.JDBC Source Parameters:Schema Access Strategy": "Inherit Schema"
}
The following list collects the properties from the configuration example that must be customized for this use case.
topics
The name of the Kafka topic that the connector sends messages to.
Database Connection URL
The JDBC URL of the PostgreSQL database. For example, jdbc:postgresql://myhost:5432/mydb.
Database Driver Location
A comma-separated list of files or folders containing the JDBC client libraries.
Database Driver Class Name
The Java class name of the PostgreSQL Driver implementation.
Database Type
The type of the database. Because this is a PostgreSQL example, the property is set to PostgreSQL.
Database User Name
The username used for authenticating to the database.
Database User Password
The password of the user.
Database Table Name
The name of the database table to query data from.
Kafka Message Key Column
Specifies a database table column. The value of the column specified is used as the key of the Kafka message.
Maximum-value Columns
The name of the column used for incremental loading. The column value must be increasing, for example, coming from a sequence or system timestamp.
Kafka Message Data Format
The format of the messages the connector sends to Kafka.
Schema Access Strategy
Specifies the strategy used for determining the schema of the database record. In this example, this property is set to Inherit Schema, meaning that the schema is determined (inherited) from the database table.