Getting started with Kafka Connect

Get started with Kafka Connect in CSP Community Edition.

Kafka Connect is a tool for streaming data between Apache Kafka and other systems in a reliable and scalable fashion. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Source connectors can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. Sink connectors can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.

CSP Community Edition is shipped with many different Cloudera developed and publicly available connectors that each cover a specific use case for streaming data. All of these can be deployed, managed, and monitored using Streams Messaging Manager (SMM)

The following tutorial walks you through a simple use case where data is moved from a Kafka topic into a PostgreSQL database using the JDBC Sink connector, which is one of the Cloudera developed connectors shipped with CSP Community Edition.

Before you begin

Find out the name or ID of the Kafka and PostgreSQL container. You will need to pass the name or ID of the containers as parameters in some of the commands you will be running. The container name or ID can be listed using docker ps. For example:
docker ps -a --format '{{.ID}}\t{{.Names}}' --filter "name=kafka.(\d)" --filter "name=postgres"

The Kafka container will either be called cspce-kafka-1 or cspce_kafka_1.

The PostgreSQL container will either be called cspce-postgresql-1 or cspce_postgresql_1.

Creating a database and table in PostgreSQL

In order to stream data using the JDBC Sink connector, a destination is required for that data. In this tutorial, the database and table is created using the PostgreSQL instance already deployed in CSP Community Edition.

  1. Open a new terminal window and run the following command:
    docker exec -it [***POSTGRESQL CONTAINER NAME OR ID***] /bin/bash
  2. Start the psql client.
    psql
  3. Create a database.
    CREATE DATABASE csp;
  4. Change the database owner to smm.
    ALTER DATABASE csp OWNER TO smm;
  5. Connect to the csp database.
    \c csp
  6. Create a table.
    CREATE TABLE demo (Name VARCHAR ( 255 ), Role VARCHAR, Age INT);
  7. Change the table owner to smm.
    ALTER TABLE demo OWNER to smm;

Creating a topic and producing messages

Before you can deploy the JDBC Sink connector, you need a Kafka topic with some messages that you can use. The connector connects to this topic and streams the data from the topic into the PostgreSQL database. The topic will be created using SMM. Message production is done using the Kafka console producer.

  1. Access the SMM UI by entering the following in a browser window:
    http://localhost:9991
  2. Click (Topics) in the navigation sidebar.
  3. Configure the topic as follows:
    • Topic Name: csp-connect
    • Partitions: 1
    • Availability: Low
    • Cleanup Policy: delete
  4. Click Save.
  5. Verify that the topic was created.
    This can be done by typing csp-connect in the search field. If the topic was successfully created, it will be listed under Topics.
  6. Open a terminal session and run the following command:
    docker exec -it [***KAFKA CONTAINER NAME OR ID***] /bin/bash
  7. Run the Kafka console producer.
    /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --topic csp-connect
  8. Produce the following messages:
    {"Name":"Jim","Role":"Developer","Age":"23"}
    {"Name":"Mary","Role":"Data Engineer","Age":"42"}
    {"Name":"Jane","Role":"Developer","Age":"37"}

    Notice how the schema of the messages matches the schema of the database table created in the previous section. This is required as otherwise the connector would not know where to put the data in the table. Ensure that each line in the example is produced as a single Kafka message.

  9. Using the SMM UI, verify that the messages were produced.
    1. Go Topics and find the csp-connect topic.
    2. Click (Profile) and go to Data Explorer.
      If message production was successful, you should see the messages produced on the UI.

Deploying the connector

After both the topic and database is set up, it's time to deploy the JDBC Sink connector using the SMM UI.

  1. Click (Connect) on the navigation sidebar of the SMM UI.
  2. Click +New Connector.
  3. Go to Sink Templates and select JDBC Sink connector.
    After a connector is selected, you are presented with the Connector Form. The Connector Form is used to configure your connector. Most connectors are shipped with a default configuration template to ease configuration. The properties and values included in the templates depend on the selected connector. If a template is available for a specific connector, it is loaded into the Connector Form when you select the connector.
  4. Click Import Connector Configuration....
  5. Copy and paste the following configuration in the Import Connector Config… dialog.
    {
     "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
     "extensions.directory": "/tmp/nifi-stateless-extensions",
     "failure.ports": "Retry from PutDatabaseRecord",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "meta.smm.predefined.flow.name": "JDBC Sink",
     "meta.smm.predefined.flow.version": "1.0.0",
     "name": "sink-demo",
     "nexus.url": "https://repository.cloudera.com/artifactory/repo",
     "parameter.JDBC Sink Parameters:Database Connection URL": "jdbc:postgresql://postgresql:5432/csp",
     "parameter.JDBC Sink Parameters:Database Driver Class Name": "org.postgresql.Driver",
     "parameter.JDBC Sink Parameters:Database Driver Location": "/opt/connect/plugin/libs/debezium-connector-postgres/postgresql-connector-java.jar",
     "parameter.JDBC Sink Parameters:Database Table Name": "demo",
     "parameter.JDBC Sink Parameters:Database Type": "PostgreSQL",
     "parameter.JDBC Sink Parameters:Database User Name": "smm",
     "parameter.JDBC Sink Parameters:Database User Password": "cloudera",
     "parameter.JDBC Sink Parameters:Kafka Message Data Format": "JSON",
     "parameter.JDBC Sink Parameters:Schema Access Strategy": "Infer Schema",
     "tasks.max": "1",
     "topics": "csp-connect",
     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
     "working.directory": "/tmp/nifi-stateless-working"
    }
    

    Notice that Schema Access Strategy is set to Infer Schema. This tells the connector to figure out the schema of the data based on the data in the topic. Based on the format of the data, the connector will know how to match the data in the topic with the table columns. Note however that the connector is capable of retrieving the schema by other means, for example from Schema Registry. For more information regarding Schema Access Strategy as well as the other properties included in this example, see JDBC Sink connector reference in the Related Information section at the bottom of this page.

  6. Click Import.
  7. Click Validate.
  8. Click Next.
  9. Click Deploy.
    A pop-up window appears notifying you that deployment was successful.
  10. Query the database.
    After the connector is deployed, switch over to the terminal session you used to set up the database and run the following command:
    SELECT * FROM public.demo;
    

Monitoring and managing the connector

Once the connector is deployed, you can monitor its activity using the SMM UI.

  1. In the SMM UI click (Connect) on the navigation sidebar.
    Clicking on opens the Connect Cluster page with the Connectors tab open. This page provides a high level overview and metrics of all your connectors as well as the connect cluster.
  2. Click (Connector Profile) next to the connector’s name.
    The Connector Profile page enables you to monitor details of the connector and its tasks as well as its configuration. The page consists of two tabs Connector Profile and Connector Settings. The following points give a quick introduction of the UI and its features. Feel free to experiment and explore.
    • The Connector Profile section provides you with details regarding the Classname, Assigned Worker, Status, Total Tasks, Running Tasks, Failed Tasks, and Paused Tasks.
    • Using the buttons in the top right-hand corner you can pause, resume, restart, or delete the connector.
    • In the Tasks section, you can view and monitor Status, Worker ID, Task ID, and various other details regarding connector tasks. Clicking next to a task displays detailed information and metrics about the selected task. In addition to viewing status and metrics, the Tasks section also allows you to restart a particular task. This can be done by selecting the task you want to restart and then clicking the Restart option found within the Tasks section.
    • The Connector Settings tab enables you to view and edit the configuration of the connector.
    • Clicking Edit in the bottom left-hand corner allows you to reconfigure the connector.
    • Using the buttons in the top right-hand corner you can pause, resume, restart, or delete the connector.
Now that you are familiar with the basics of connector deployment and monitoring, Cloudera recommends that you explore and learn about the various other connectors that are included with CSP Community Edition. For more information on each of the connectors, see Connectors in the related information section.