Configuring Kafka Security
This topic describes additional steps you can take to ensure the safety and integrity of your data stored in Kafka, with features available in Cloudera Distribution of Apache Kafka 2.0.0 and higher:
Deploying SSL for Kafka
Kafka allows clients to connect over SSL. By default, SSL is disabled, but can be turned on as needed.
First, generate the key and the certificate for each machine in the cluster using the Java keytool utility. See Creating Certificates.
keystore is the keystore file that stores your certificate. validity is the valid time of the certificate in days.
$ keytool -keystore {tmp.server.keystore.jks} -alias localhost -validity {validity} -genkey
Make sure that the common name (CN) matches the fully qualified domain name (FQDN) of your server. The client compares the CN with the DNS domain name to ensure that it is connecting to the correct server.
Step 2. Creating Your Own Certificate Authority
You have generated a public-private key pair for each machine, and a certificate to identify the machine. However, the certificate is unsigned, so an attacker can create a certificate and pretend to be any machine. Sign certificates for each machine in the cluster to prevent unauthorized access.
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365The generated CA is a public-private key pair and certificate used to sign other certificates.
keytool -keystore {client.truststore.jks} -alias CARoot -import -file {ca-cert}
Step 3. Signing the certificate
Now you can sign all certificates generated by step 1 with the CA generated in step 2.- Export the certificate from the keystore:
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
- Sign it with the CA:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
- Import both the certificate of the CA and the signed certificate into the keystore:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
The definitions of the variables are as follows:- keystore: the location of the keystore
- ca-cert: the certificate of the CA
- ca-key: the private key of the CA
- ca-password: the passphrase of the CA
- cert-file: the exported, unsigned certificate of the server
- cert-signed: the signed certificate of the server
#!/bin/bash #Step 1 keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey #Step 2 openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert #Step 3 keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234 keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
Step 4. Configuring Kafka Brokers
listeners=PLAINTEXT://host.name:port,SSL://host.name:port
- Turn on SSL for the Kafka service by turning on the ssl_enabled configuration for the Kafka CSD.
- Set security.inter.broker.protocol as SSL, if Kerberos is disabled; otherwise, set it as SASL_SSL.
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234
Other configuration settings might also be needed, depending on your requirements:
- ssl.client.auth=none: Other options for client authentication are required, or requested, where clients without certificates can still connect. The use of requested is discouraged, as it provides a false sense of security and misconfigured clients can still connect.
- ssl.cipher.suites: A cipher suite is a named combination of authentication, encryption, MAC, and a key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. This list is empty by default.
- ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1: Provide a list of SSL protocols that your brokers accept from clients.
- ssl.keystore.type=JKS
- ssl.truststore.type=JKS
security.inter.broker.protocol=SSLDue to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If you need stronger algorithms (for example, AES with 256-bit keys), you must obtain the JCE Unlimited Strength Jurisdiction Policy Files and install them in the JDK/JRE. For more information, see the JCA Providers Documentation.
with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)To check whether the server keystore and truststore are set up properly, run the following command:
openssl s_client -debug -connect localhost:9093 -tls1
-----BEGIN CERTIFICATE----- {variable sized random bytes} -----END CERTIFICATE----- subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=John Smith issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.comIf the certificate does not appear, or if there are any other error messages, your keystore is not set up properly.
Step 5. Configuring Kafka Clients
SSL is supported only for the new Kafka Producer and Consumer APIs. The configurations for SSL are the same for both the producer and consumer.
If client authentication is not required in the broker, the following shows a minimal configuration example:
security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234
If client authentication is required, a keystore must be created as in step 1, and you must also configure the following properties:
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
- ssl.provider (Optional). The name of the security provider used for SSL connections. Default is the default security provider of the JVM.
- ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC, and a key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.
- ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. This property should list at least one of the protocols configured on the broker side
- ssl.truststore.type=JKS
- ssl.keystore.type=JKS
Using Kafka Supported Protocols
- Enabling SSL encryption for client-broker communication but keeping broker-broker communication as PLAINTEXT. Because SSL has performance overhead, you might want to keep inter-broker communication as PLAINTEXT if your Kafka brokers are behind a firewall and not susceptible to network snooping.
- Migrating from a non-secure Kafka configuration to a secure Kafka configuration without requiring downtime. Use a rolling restart and keep security.inter.broker.protocol set to a protocol that is supported by all brokers until all brokers are updated to support the new protocol.
For example, if you have a Kafka cluster that needs to be configured to enable Kerberos without downtime, follow these steps:
- Set security.inter.broker.protocol to PLAINTEXT.
- Update the Kafka service configuration to enable Kerberos.
- Perform a rolling restart.
- Set security.inter.broker.protocol to SASL_PLAINTEXT.
SSL | Kerberos | |
---|---|---|
PLAINTEXT | No | No |
SSL | Yes | No |
SASL_PLAINTEXT | No | Yes |
SASL_SSL | Yes | Yes |
In most cases, set security.inter.broker.protocol to the protocol you are using for broker-to-client communication. Set security.inter.broker.protocol to a protocol different than the broker-to-client protocol only when you are performing a rolling upgrade from a non-secure to a secure Kafka cluster.
Enabling Kerberos Authentication
Kafka 2.0 and higher supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration. If you do not have a Kerberos server, install it before proceeding. See Enabling Kerberos Authentication Using the Wizard.
If you already have configured the mapping from Kerberos principals to short names using the hadoop.security.auth_to_local HDFS configuration property, configure the same rules for Kafka by adding the sasl.kerberos.principal.to.local.rules property to the Advanced Configuration Snippet for Kafka Broker Advanced Configuration Snippet using Cloudera Manager. Specify the rules as a comma separated list.
To enable Kerberos authentication for Kafka:
- From Cloudera Manager, navigate to Kafka > Configurations. Set SSL client authentication to none. Set Inter Broker Protocol to SASL_PLAINTEXT.
- Click Save Changes.
- Restart the Kafka service.
- Make sure that listeners = SASL_PLAINTEXT is present in the Kafka broker logs /var/log/kafka/server.log.
- Create a jaas.conf file with the following contents to use with cached Kerberos credentials (you can modify this to use keytab files instead of cached
credentials. To generate keytabs, see Step 6: Get
or Create a Kerberos Principal for Each User Account).
If you use kinit first, use this configuration.
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true; };
If you use keytab, use this configuration:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; };
- Create the client.properties file containing the following properties.
security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka
- Test with the Kafka console producer and consumer. To obtain a Kerberos ticket-granting ticket (TGT):
$ kinit <user>
- Verify that your topic exists. (This does not use security features, but it is a best practice.)
$ kafka-topics --list --zookeeper <zkhost>:2181
- Verify that the jaas.conf file is used by setting the environment.
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/jaas.conf"
- Run a Kafka console producer.
$ kafka-console-producer --broker-list <anybroker>:9092 --topic test1 --producer.config client.properties
- Run a Kafka console consumer.
$ kafka-console-consumer --new-consumer --topic test1 --from-beginning --bootstrap-server <anybroker>:9092 --consumer.config client.properties
Enabling Encryption at Rest
Data encryption is increasingly recognized as an optimal method for protecting data at rest.
- Stop the Kafka service.
- Archive the Kafka data to an alternate location, using TAR or another archive tool.
- Unmount the affected drives.
- Install and configure Navigator Encrypt.
- Expand the TAR archive into the encrypted directories.
Using Kafka with Sentry Authorization
Starting with Kafka 2.1.x on CDH 5.9.x and higher, Apache Sentry includes Kafka binding you can use to enable authorization in Kafka with Sentry. For more information, see Authorization With Apache Sentry.
Configuring Kafka to Use Sentry Authentication
The following steps describe how to configure Kafka to use Sentry authorization. These steps assume you have installed Kafka and Sentry on your cluster.
For more information, see Installing or Upgrading Kafka and Sentry Installation.
To configure Sentry authentication for Kafka:
- Go to .
- Select the checkbox Enable Kerberos Authentication.
- Select a Sentry service in the Kafka service configuration.
- Add Super users. Super users can perform any action on any resource in the Kafka cluster. The kafka user is added as a super user by default. Super user requests are authorized without going through Sentry, which provides enhanced performance.
- Select the checkbox Enable Sentry Privileges Caching to enhance performance.
Authorizable Resources
Authorizable resources are resources or entities in a Kafka cluster that require special permissions for a user to be able to perform actions on them. Kafka has four authorizable resources.
- Cluster, which controls who can perform cluster-level operations such as creating or deleting a topic. This can only have one value, kafka-cluster, as one Kafka cluster cannot have more than one cluster resource.
- Topic, which controls who can perform topic-level operations such as producing and consuming topics. Its value must match exactly the topic name in the Kafka cluster. Wildcards ('*') cannot be used when specifying the topic.
- Consumergroup, which controls who can perform consumergroup-level operations such as joining or describing a consumergroup. Its value must exactly match the group.id of a consumergroup.
- Host, which controls from where specific operations can be performed. Think of this as a way to achieve IP filtering in Kafka. You can set the value of this resource to the wildcard (*), which represents all hosts.
Authorized Actions
You can perform multiple actions on each resource. The following operations are supported by Kafka, though not all actions are valid on all resources.
- ALL, this is a wildcard action, and represents all possible actions on a resource.
- read
- write
- create
- delete
- alter
- describe
- clusteraction
Authorizing Privileges
Privileges define what actions are allowed on a resource. A privilege is represented as a string in Sentry. The following rules apply to a valid privilege.
- Can have at most one Host resource. If you do not specify a Host resource in your privilege string, Host=* is assumed.
- Must have exactly one non-Host resource.
- Must have exactly one action specified at the end of the privilege string.
For example, the following are valid privilege strings:
Host=*->Topic=myTopic->action=ALL Topic=test->action=ALL
Granting Privileges to a Role
The following examples grant privileges to the role test, so that users in testGroup can create a topic named testTopic and produce to it.
However, before you can assign the test role, you must first create it. To create the test role:
$kafka-sentry -cr -r test
To confirm that the role was created, list the roles:
$ kafka-sentry -lr
If Sentry privileges caching is enabled, as recommended, the new privileges you assign take some time to appear in the system. The time is the time-to-live interval of the Sentry privileges cache, which is set using sentry.kafka.caching.ttl.ms. By default, this interval is 30 seconds. For test clusters, it is beneficial to have changes appear within the system as fast as possible, therefore, Cloudera recommends that you either use a lower time interval, or disable caching with sentry.kafka.caching.enable.
- Allow users in testGroup to create a new topic from localhost.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Cluster=kafka-cluster->action=create"
- Allow users in testGroup to describe testTopic from localhost, which the user creates and uses.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Topic=testTopic->action=describe"
- Allow users in testGroup to write to testTopic from localhost, which allows users to produce to testTopic.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Topic=testTopic->action=write"
- Create testTopic.
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 \ --partitions 1 --topic testTopic $ kafka-topics --list --zookeeper localhost:2181 testTopic
- Assign the test role to the group testGroup:
kafka-sentry -arg -r test -g testGroup
- To verify that the test role is part of the group testGroup:
kafka-sentry -lr -g testGroup
- Produce to testTopic. Note that you have to pass a configuration file, producer.properties, with information on JAAS
configuration and other Kerberos authentication related information. See SASL
Configuration for Kafka Clients.
$ kafka-console-producer --broker-list localhost:9092 --topic testTopic \ --producer.config producer.properties This is a message This is another message
- Allow users in testGroup to describe a consumer group, testconsumergroup, that it will start and join.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Consumergroup=testconsumergroup->action=describe"
- Allow users in testGroup to read from a consumer group, testconsumergroup, that it will start and join.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Topic=testTopic->action=read"
-
Allow users in testGroup to read from testTopic from localhost and to consume from testTopic.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Consumergroup=testconsumergroup->action=read"
- Consume from testTopic. Note that you have to pass a configuration file, consumer.properties, with information on JAAS
configuration and other Kerberos authentication related information. The configuration file must also specify group.id as testconsumergroup.
$ kafka-console-consumer --zookeeper localhost:2181 --topic testTopic --from-beginning --consumer.config consumer.properties This is a message This is another message
Troubleshooting
If Kafka requests are failing due to authorization, the following steps can provide insight into the error:
- Make sure you are kinit'd as a user who has privileges to perform an operation.
- Identify which broker is hosting leader of the partition you are trying to produce to or consume from, as this leader is going to authorize your request against Sentry. One easy way of debugging is to just have one Kafka broker. Change log level of the Kafka broker to debug and restart the broker.
- Run the Kafka client or Kafka CLI with required arguments and capture the Kafka log, which should be something like /var/log/kafka/kafka-broker-<HOST_ID>.log on kafka broker's host.
- There will be many Jetty logs, and filtering that out usually helps in reducing noise. Look for log messages from org.apache.sentry.
- Look for following information in the filtered logs:
- Groups the user Kafka client or CLI is running as.
- Required privileges for the operation.
- Retrieved privileges from Sentry.
- Required and retrieved privileges comparison result.
This log information can provide insight into which privilege is not assigned to a user, causing a particular operation to fail.