Configuring Kafka for Kerberos Over Ambari
Also available as:
PDF

Chapter 5. Producing Events/Messages to Kafka on a Secured Cluster

Prerequisite: Make sure that you have enabled access to the topic (via Ranger or native ACLs) for the user associated with the producer process. We recommend that you use Ranger to manage permissions. For more information, see the Apache Ranger User Guide for Kafka.

During the installation process, Ambari configures a series of Kafka client and producer settings, and creates a JAAS configuration file for the Kafka client. It is not necessary to modify these settings, but for more information about them see Kafka Configuration Options.

Note: Only the Kafka Java API is supported for Kerberos. Third-party clients are not supported.

To produce events/messages:

  1. Specify the path to the JAAS configuration file as one of your JVM parameters:

    -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf

    For more information about the kafka_client_jaas file, see "JAAS Configuration File for the Kafka Client" in Kafka Configuration Options.

  2. kinit with the principal's keytab.

  3. Launch kafka-console-producer.sh with the following configuration options. (Note: these settings are the same as in previous versions, except for the addition of --security-protocol SASL_PLAINTEXT.)

    ./bin/kafka-console-producer.sh --broker-list <hostname:port [,hostname:port, …]> --topic <topic-name> --security-protocol SASL_PLAINTEXT

    For example:

    ./bin/kafka-console-producer.sh --broker-list c6401.ambari.apache.org:6667,c6402.ambari.apache.org:6667 --topic test_topic --security-protocol SASL_PLAINTEXT

Producer Code Example for a Kerberos-Enabled Cluster

The following example shows sample code for a producer in a Kerberos-enabled Kafka cluster. Note that the SECURITY_PROTOCOL_CONFIG property is set to SASL_PLAINTEXT.

package com.hortonworks.example.kafka.producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.Random;

public class BasicProducerExample {

   public static void main(String[] args){

       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:6667");
       
       // specify the protocol for SSL Encryption
       props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
       
       props.put(ProducerConfig.ACKS_CONFIG, "all");
       props.put(ProducerConfig.RETRIES_CONFIG, 0);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

       Producer<String, String> producer = new KafkaProducer<String, String>(props);
       TestCallback callback = new TestCallback();
       Random rnd = new Random();
       for (long i = 0; i < 100 ; i++) {
           ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                   "test-topic", "key-" + i, "message-"+i );
           producer.send(data, callback);
       }

       producer.close();
   }


   private static class TestCallback implements Callback {
       @Override
       public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if (e != null) {
               System.out.println("Error while producing message to topic :" + recordMetadata);
               e.printStackTrace();
           } else {
               String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
               System.out.println(message);
           }
       }
   }

}

To run the example, issue the following command:

$ java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf com.hortonworks.example.kafka.producer.BasicProducerExample

Troubleshooting

Issue: If you launch the producer from the command-line interface without specifying the security-protocol option, you will see the following error:

2015-07-21 04:14:06,611] ERROR fetching topic metadata for topics 
[Set(test_topic)] from broker 
[ArrayBuffer(BrokerEndPoint(0,c6401.ambari.apache.org,6667), 
BrokerEndPoint(1,c6402.ambari.apache.org,6667))] failed 
(kafka.utils.CoreUtils$)
kafka.common.KafkaException: fetching topic metadata for topics 
[Set(test_topic)] from broker 
[ArrayBuffer(BrokerEndPoint(0,c6401.ambari.apache.org,6667), 
BrokerEndPoint(1,c6402.ambari.apache.org,6667))] failed 
     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
     at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
     at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

Solution: Add --security-protocol SASL_PLAINTEXT to the kafka-console-producer.sh runtime options.