Java client security examples

Review the Java client security examples to learn what configuration properties you have to set when connecting to secured or unsecured clusters.

The following code snippets give you a few simple examples on what configuration properties you need to set for your Kafka clients when connecting them to either secured or unsecured Kafka clusters. Use the following examples as a starting point and make changes as necessary. Depending on your environment, you may need to set other optional properties. See the upstream Apache Kafka documentation for a comprehensive list of available properties.

Unsecured

Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "***BROKER HOST***:***PORT***");

SSL/TLS

Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "***BROKER HOST***:***PORT***");
producerConfig.put(“security.protocol”, “SSL”);
producerConfig.put(“ssl.truststore.location”, “***PATH TO TRUSTSTORE***”); // .jks format
producerConfig.put(“ssl.truststore.password”, “***TRUSTSTORE PASSWORD***”);
producerConfig.put(“ssl.keystore.location”, “***PATH TO KEYSTORE***”); // .jks format
producerConfig.put(“ssl.keystore.password”, “***KEYSTORE PASSWORD***”);

PLAIN (LDAP, PAM, and others)

Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "***BROKER HOST***:***PORT***");
producerConfig.put(“security.protocol”, “SASL_SSL”);
producerConfig.put(“sasl.mechanism”, “PLAIN”);
producerConfig.put(“ssl.truststore.location”, “***PATH TO TRUSTSTORE***”); // .jks format
producerConfig.put(“ssl.truststore.password”, “***TRUSTSTORE PASSWORD***”);
producerConfig.put(“sasl.jaas.config”, “org.apache.kafka.common.security.plain.PlainLoginModule required username="***USERNAME***" password="***PASSWORD***"); // credentials sent across the wire, also enable SSL/TLS encryption

Kerberos

Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "***BROKER HOST***:***PORT***");
producerConfig.put(“security.protocol”, “SASL_PLAINTEXT”);
producerConfig.put(“sasl.kerberos.service.name”, “***SERVICE NAME***”); // kafka
producerConfig.put(“sasl.jaas.config”, “com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;”); // use kerberos ticket cache

OAuth2/OAUTHBEARER

Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "***BROKER HOST***:***PORT***");
producerConfig.put("security.protocol", "SASL_PLAINTEXT");
producerConfig.put("sasl.mechanism", "OAUTHBEARER");
producerConfig.put("sasl.login.callback.handler.class", org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.class.getName()); 
producerConfig.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
                    	"clientId=\"***CLIENT_ID***\" " +
                    	"clientSecret=\"***CLIENT_SECRET***\" " +
                    	"scope=\"***SCOPE***\";"
    	);
producerConfig.put("sasl.oauthbearer.token.endpoint.url", "http://***OAUTH SERVER***/***TOKEN ENDPOINT***");