Spark 2 Kafka Integration

Minimum Required Role: Cluster Administrator (also provided by Full Administrator)

Cloudera Spark 2.1 release 1 and later include a Kafka integration feature that uses the new Kafka consumer API. This new Kafka consumer API supports reading data from secure Kafka clusters. In this context, secure clusters are those that are authenticated by Kerberos, and optionally using TLS/SSL for wire encryption.

Requirements

To read data securely from Kafka, or to use the new Spark-Kafka integration that uses the new Kafka consumer API, requires the following software versions:

  • Cloudera Spark 2.1 release 1 or later.

  • CDK 2.1 or later. (Although the Kafka consumer API is available starting in CDK 2.0, the Spark integration requires Kafka 2.1.)

Running Spark Jobs that Integrate with Kafka

To run jobs that use the new Kafka integration, you can use one of the following two techniques.

Technique #1: Set SPARK_KAFKA_VERSION environment variable

When running jobs that require the new Kafka integration, set SPARK_KAFKA_VERSION=0.10 in the shell before launching spark-submit. Use the appropriate environment variable syntax for your shell, such as:

# Permanently set the environment variable
export SPARK_KAFKA_VERSION=0.10
spark-submit arguments

# Or:

# Set the environment variable only for the duration of 1 command
SPARK_KAFKA_VERSION=0.10 spark-submit arguments

Technique #2: Set spark_kafka_version setting through Cloudera Manager

Set spark_kafka_version configuration in Cloudera Manager’s Spark 2 service to be 0.10 and redeploy the client configuration. No need to source any environment variables when launching spark-submit.

Technique #2 is preferable if you have upgraded your Kafka brokers to CDK 2.1 or higher, and do not intend to have your Spark jobs communicate with brokers running a version of Kafka prior to CDK 2.1.

If you modify the default Kafka version to 0.10 by using technique #2, you can connect to old Kafka brokers (for example, one based on CDK 2.0) by setting SPARK_KAFKA_VERSION=0.9 when running your application.

Building Applications

To build applications against the new Kafka integration, you can add the dependency by using the following Maven coordinates:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.1.0.cloudera1</version>
</dependency>

Reading from Authorized Kafka

To read from a Kafka cluster authorized by Sentry, give privileges to your consumer group as described in Configuring Kafka Security. You must also grant privileges to another consumer group spark-executor-your_consumer_group in the same way. This is because the driver uses the consumer group specified in your app, but the executors use a different consumer group, which is hardcoded to spark-executor-your_consumer_group.