Writing data to Ozone in a Kerberos and TLS/SSL enabled cluster with Kafka Connect

You can use the HDFS Sink Connector developed by Cloudera to write Kafka topic data to Ozone in a secure cluster. Connector deployment and configuration is done using the SMM UI.

The following steps walk you through how the Cloudera-developed HDFS Sink Connector can be set up and deployed using the SMM UI.

In addition to the connector setup, these steps also describe how you can create a test topic and populate it with data using Kafka command line tools. If you already have a topic that is ready for use and do not want to create a test topic, you can skip steps 1 through 3. These steps deal with topic creation, message consumption, and message production. They are not necessary to carry out.

  • Ensue that a Kerberos and TLS/SSL enabled CDP PvC Base cluster with Kafka, SMM, and Ozone is set up and configured.
  • To create a test topic with the Kafka console tools, you must ensure that a .properties client configuration file is available for use.
    You can create one using the following example as a template:
    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="[***PATH TO KEYTAB FILE***]" principal="[***KERBEROS PRINCIPAL***]";
    security.protocol=SASL_SSL
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
    ssl.truststore.location=[***TRUSTSTORE LOCATION***]
  1. Create a Kafka topic:
    1. SSH into one of the hosts in your cluster.
      ssh [***USER***]@[***MY-CLUSTER-HOST.COM***]
    2. Create a topic with the kafka-topics tool.
      kafka-topics --create --bootstrap-server [***MY-CLUSTER-HOST.COM:9093***] --replication-factor 1 --partitions 1 --topic [***TOPIC***] --command-config [***CLIENT CONFIG FILE***]
      If an out of memory exception is displayed, increase the JVM heap with the following command and try again:
      export KAFKA_OPTS="-Xmx1g -Xms1g"
    3. Verify that the topic is created.
      kafka-topics --list --bootstrap-server [***MY-CLUSTER-HOST.COM:9093***] --command-config [***CLIENT CONFIG FILE***]
  2. Produce messages to your topic with the kafka-console-producer.
    kafka-console-producer --broker-list [***MY-CLUSTER-HOST.COM:9093***] --topic [***TOPIC***] --producer.config [***CLIENT CONFIG FILE***] 
    Enter test messages once the tool is running.
    >my first message
    >my second message
  3. Consume messages:
    1. Open a new terminal session and log in to one of the hosts in your cluster.
    2. Consume messages with the kafka-console-consumer.
      kafka-console-consumer --from-beginning --bootstrap-server [***MY-CLUSTER-HOST.COM:9093***] --topic [***TOPIC***] --consumer.config [***CLIENT CONFIG FILE***] 
      The messages you produced with kafka-console-producer appear. You can switch to the terminal session that is running kafka-console-producer and produce additional messages. These new messages appear in real time in the session running kafka-console-consumer.
  4. Deploy and configure an HDFS Sink Connector:
    1. In Cloudera Manager, select the Streams Messaging Manager service.
    2. Click Streams Messaging Manager Web UI to log in to the UI.
      If prompted, enter a valid username as well as a password and click SIGN IN.
    3. Click the Connect option in the left-side menu.
    4. Click + New Connector to add a new connector.
    5. Go to the Sink Connectors tab and select the HDFS Sink Connector.
      On the UI the HDFS Sink Connector is represented by its class name, which is com.cloudera.dim.kafka.connect.hdfs.HdfsSinkConnector.
    6. Enter a name for the connector.
    7. Configure the connector.
      Use the following example as a template:
      {
       "connector.class": "com.cloudera.dim.kafka.connect.hdfs.HdfsSinkConnector",
       "hdfs.uri": "ofs://ozone1/vol1/bucket1/",
       "tasks.max": "1",
       "topics": "testTopic",
       "hdfs.kerberos.authentication": "true",
       "hdfs.kerberos.user.principal": "${cm-agent:ENV:kafka_connect_service_principal}",
       "hdfs.kerberos.keytab.path": "${cm-agent:keytab}",
       "hdfs.kerberos.namenode.principal": "hdfs/_HOST@REALM",
       "hdfs.output": "/topics_output/",
       "hadoop.conf.path": "file:///etc/hadoop/conf",
       "output.writer": "com.cloudera.dim.kafka.connect.partition.writers.txt.TxtPartitionWriter",
       "value.converter": "org.apache.kafka.connect.storage.StringConverter",
       "output.storage": "com.cloudera.dim.kafka.connect.hdfs.HdfsPartitionStorage"
      }
      Ensure that you replace the values of hdfs.uri and hdfs.output with valid Ozone paths. The template gives an example of how these paths should look like. Replace other values depending on your cluster configuration and requirements.
    8. Click Validate.
      The validator displays any JSON errors in your configuration. Fix any errors that are displayed. If your JSON is valid, the JSON is valid message is displayed.
    9. Click Next.
    10. Review your connector configuration.
    11. Click Deploy.
  5. Verify that connector deployment is successful:
    1. In the SMM UI, click the Connect option in the left-side menu.
    2. Click on either the topic or the connector you created.
      If connector deployment is successful, a flow is displayed between the topic you specified and the connector you created.
  6. Verify that topic data is written to Ozone.
    You can do this by listing the files under the ofs:// location you specified in the connector configuration.