Consume Events or Messages from Kafka on a Secured Cluster
How to consume events/messages from Kafka on a secured cluster.
Make sure that you have enabled access to the topic (via Ranger or native ACLs) for the user associated with the consumer process. We recommend that you use Ranger to manage permissions. For more information, see “Apache Ranger User Guide> Adding KAFKA Policies”.
During the installation process, Ambari configures a series of Kafka client and producer settings, and creates a JAAS configuration file for the Kafka client. It is not necessary to modify these values, but for more information see “Appendix: Kerberos Kafka Configuration Options”.
Note | |
---|---|
Only the Kafka Java API is supported for Kerberos. Third-party clients are not
supported. |
Issue: If you launch the consumer from the command-line interface without
specifying the security-protocol
option, you will see the following
error:
2015-07-21 04:14:06,611] ERROR fetching topic metadata for topics
[Set(test_topic)] from broker
[ArrayBuffer(BrokerEndPoint(0,c6401.ambari.apache.org,6667),
BrokerEndPoint(1,c6402.ambari.apache.org,6667))] failed
(kafka.utils.CoreUtils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(test_topic)] from broker
[ArrayBuffer(BrokerEndPoint(0,c6401.ambari.apache.org,6667),
BrokerEndPoint(1,c6402.ambari.apache.org,6667))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
Solution: Add --security-protocol SASL_PLAINTEXT
to the
kafka-console-consumer.sh
runtime options.
Consumer Code Example for a Kerberos-Enabled Cluster
The following example shows sample code for a producer in a Kerberos-enabled Kafka
cluster. Note that the SECURITY_PROTOCOL_CONFIG
property is set to
SASL_PLAINTEXT
.
package com.hortonworks.example.kafka.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
public class BasicConsumerExample {
public static void main(String[] args) {
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:6667");
// specify the protocol for SSL Encryption
consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig);
TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener();
consumer.subscribe(Collections.singletonList("test-topic"), rebalanceListener);
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
private static class TestConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Called onPartitionsRevoked with partitions:" + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Called onPartitionsAssigned with partitions:" + partitions);
}
}
}
To run the example, issue the following command:
# java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf com.hortonworks.example.kafka.consumer.BasicConsumerExample