Tutorial: developing and deploying a JDBC Source dataflow in Kafka Connect using Stateless NiFi

A step-by step tutorial that walks you through how you can create a JDBC Source dataflow and how to deploy the dataflow as a Kafka Connect connector using the Stateless NiFi Source connector. The connector/dataflow presented in this tutorial reads records from an Oracle database table and forwards them to Kafka in JSON format.

  • Look up the Stateless NiFi plugin version either by using the Streams Messaging Manager (SMM) UI, or by logging into a Kafka Connect host and checking the Connect plugin directory.
    1. Access the SMM UI, and click Connect in the navigation sidebar.
    2. Click the New Connector option.
    3. Locate the StatelessNiFiSourceConnector or StatelessNiFiSinkConnector cards. The version is located on the card.

      The version is made up of multiple digits. The first three represent the NiFi version. For example, if the version on the card is 1.18.0.2.4.3.0-63, then you should use NiFi 1.18.0 to build your flow.

  • Download and start NiFi. You can download NiFi from https://archive.apache.org/dist/nifi/. This example uses NiFi 1.18.0 (nifi-1.18.0-bin.zip).
  • The connector/dataflow developed in this tutorial requires the Oracle JDBC driver to function. Ensure that the driver JAR is deployed and available on every Kafka Connect host under the same path with correct file permissions. Note down the location where you deploy the driver, you will need to set the location as a property value during connector deployment. For example:
    cp ./ojdbc8-[***VERSION***].jar /var/lib/kafka_connect_jdbc/oracle-connector-java.jar
    chmod 644 /var/lib/kafka_connect_jdbc/oracle-connector-java.jar
  1. In NiFi, create a process group and give it a name.
  2. Add a QueryDatabaseTableRecord processor and an output port to the process group.
    Note down the name of the output port, you will need to set the name as the value of a connector property during connector deployment.
  3. Add a DBCPConnectionPool and a JsonRecordSetWriter controller service to the process group.
    The controller services are required for the QueryDatabaseTableRecord processor to access the database and to convert records.
  4. Examine the properties of the QueryDatabaseTable processor and the two controller services. Collect the properties that you want to parameterize.
    Parameterized properties can be set when deploying the connector. In this example, the following properties are parametrized:
    • Database Type
    • Table Name
    • Maximum-value Columns
    • Initial Load Strategy
    • Database Connection URL
    • Database Driver Class Name
    • Database Driver Location
    • Database User
    • Database Password
  5. Create a parameter context, give it a name, and add the above mentioned parameters to the parameter context.

    You can assign values to the parameters in this step. The values you assign here serve as default values for the parameters when the flow is deployed as a connector. Assigning values is not mandatory as all parameters will be configurable during connector deployment. However, assigning sensible default values is a good practice and makes the dataflow easier to deploy in Kafka Connect.

    In this example, the connector will connect to an Oracle database. Therefore, values are specified for Database Driver Class Name and Database Type parameters. Additionally, Initial Load Strategy is set to Start at Beginning.

    Ensure that you mark the Database Password parameter as a Sensitive Value during creation. This is because only sensitive parameters can be referenced from sensitive processor or controller service properties.

  6. Assign the parameter context to the process group.
  7. Reference the parameters from the processor and the controller services.

    At this point, one controller service is invalid, and the other is disabled, but that is expected. Neither processors, nor controller services need to be valid, started, or enabled at this point.

  8. Open the configuration of the QueryDatabaseTableRecord processor, and go to the SCHEDULING tab.

    Notice that Execution is set to Primary node (which is the only option for this particular processor). For Stateless NiFi Kafka connectors, this setting means that the connector is going to be deployed with one task, regardless of the value of the tasks.max connector property.

  9. At this point, your dataflow is in the following state.

    Notice how the dataflow is not running, and there are warnings present. This is expected. The dataflow does not need to run in order for you to be able to download the flow definition. The properties you must configure to make the dataflow function will be set later on during connector deployment.

  10. Right-click the process group and select Download flow definition > Without external service.
    If you followed these steps, your process group is not referencing controller services that are outside of the process group. As a result, you can choose to download the flow without external services included.
  11. Open the downloaded JSON file, and replace every occurrence of the version number (1.18.0) with the version number of your Stateless NiFi plugin.
  12. Access the SMM UI.
  13. Click Connect in the navigation sidebar.
  14. Click the New Connector option.
  15. Select StatelessNiFiSourceConnector from the available connectors.
    The Connector Configuration page appears.
  16. Configure the following connector properties.
    • name=My Custom JDBC Connector
    • output.port=Output to Kafka

      This property must be set to the name of the output port in the dataflow. The output port name was specified in Step 2.

    • topics=jdbc_example

      This is the name of the target topic that the connector writes data to.

  17. Configure the flow.snapshot property.
    The flow.snapshot property specifies the dataflow that the connector runs. This is the dataflow that you developed and exported as flow definition JSON in the previous steps. While you have multiple options when configuring this property, this tutorial uses the Import and Enhance option in SMM.
    1. Click found next to flow.snapshot.
    2. Paste the contents of the flow definition JSON, or click Browse... to upload the file from your machine.
    3. Click Save and Enhance.
      Save and Enhance is a unique SMM feature only available for this property. If this option is used, SMM parses the dataflow specified in flow.snapshot, extracts all parameters that are available in the dataflow, and adds them to connector configuration.
  18. Configure the dataflow parameters.
    These are the properties that have the parameter. prefix. In this specific example, values for Database Class Name, Database Type, and Initial Load Strategy are loaded automatically because values were specified within the dataflow in Step 5. If needed, you can change the values.

    Set Database Driver Location to the path where you deployed the JDBC driver.

  19. Review your configuration.
  20. Click Validate.
  21. Click Next.
  22. Click Deploy to deploy the connector.
    Connector deployment is not instantaneous. You might need to wait a few seconds for deployment to finish.
Your custom developed flow is deployed in Kafka Connect and is running as a Kafka Connect connector.
Once deployment finishes, navigate to the Connector Profile page of the connector. On this page you can view various details regarding the connector. Most importantly, you can view what the connector's status is, and which workers the connector and its task were assigned to.

In this particular case, the connector was assigned to the worker running on cluster host 1, whereas the task was assigned to worker running on cluster host 3. This means that dataflow log entries will be present in the Kafka Connect log file on host 3.

In addition to monitoring the connector, you can also check the contents of the topic that the connector is writing to. This can be done in SMM by navigating to the Topics page, searching for the topic, and clicking . This opens the SMM Data Explorer in a modal window. You can use the Data Explorer to sample the data that is flowing through the topic. The database table’s records appear in the topic in JSON format.