Configuring Flume Security with Kafka
In 5.15.0 and higher releases of CDH 5, and in CDH 6.1, you can use Cloudera Manager to configure Flume to communicate with Kafka sources, sinks, and channels over TLS.
kafka.consumer.security.protocol kafka.consumer.sasl.kerberos.service.name
You also have to configure global truststore settings in Cloudera Manager. To do it, edit the Flume TLS/SSL Client Trust Store File and Flume TLS/SSL Client Trust Store Password properties in Cloudera Manager.
For detailed instructions, see Setting Keystore and Truststore for Flume Components.
To configure Flume to connect to secure Kafka, perform the following steps:
- In Cloudera Manager, go to the Flume service.
- Open the Configuration tab.
-
Use the Search field to search for Kafka. The Kafka Service property is displayed. Select the Kafka service that you want the Flume service to connect to.
The following image shows an example of the Kafka Service property with the KAFKA-1 service selected:
flume.keytab
Cloudera Manager automatically creates the flume.keytab file. However, if you need to edit the file, you can find it in the following location:
/var/run/cloudera-scm-agent/process/<latest_id>-flume-AGENT/flume.keytab
The file must not be empty on any host that runs a kerberized Flume agent.
Principal management is handled by Cloudera Manager for Flume, just as with other services. For example, principals are listed on the
page in Cloudera Manager.jaas.conf
Cloudera Manager also creates a flafka_jaas.conf file on each host that runs a Flume agent. You do not need to create or edit the file manually. The following information is provided for troubleshooting.
The configuration information in the file is used to communicate with Kafka and also provide normal Flume Kerberos support. The flafka_jaas.conf file contains two entries for the Flume principal: Client and KafkaClient. Note that the principal property is host specific. Unix user flume must have read permission for this file.
/opt/cloudera/security/flafka_jaas.conf: Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="flume.keytab" principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="flume.keytab" principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM"; };
Example
The code sample below is a complete working example Flume configuration with two tiers. Tier1 reads an input log and puts the new Events to the sectest topic using a Kafka Sink (the tailed file has to exist before agent starts). Tier2 listens to the sectest topic by a Kafka Source and logs every event.
######################################################## # Tier1 ######################################################## tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 1000 tier1.channels.channel1.transactionCapacity = 100 tier1.sinks.sink1.channel = channel1 tier1.sources.source1.channels = channel1 tier1.sources.source1.type = exec tier1.sources.source1.command = tail -F /tmp/input/input.log tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.sink1.kafka.topic = sectest tier1.sinks.sink1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093 ### # Generated security related setup ### # tier1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL # tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka ### ######################################################## # Tier2 ######################################################## tier2.sources = source1 tier2.channels = channel1 tier2.sinks = sink1 tier2.channels.channel1.type = memory tier2.channels.channel1.capacity = 1000 tier2.channels.channel1.transactionCapacity = 100 tier2.sinks.sink1.channel = channel1 tier2.sources.source1.channels = channel1 tier2.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier2.sources.source1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093 tier2.sources.source1.kafka.topics = sectest tier2.sources.source1.kafka.offsets.storage = kafka tier2.sources.source1.kafka.consumer.group.id = flume tier2.sinks.sink1.type = logger ### # Generated security related setup ### # tier2.sources.source1.kafka.consumer.security.protocol = SASL_SSL # tier2.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka ###