Integrating CDS Powered by Apache Spark with Apache Kafka

Minimum Required Role: Cluster Administrator (also provided by Full Administrator)

Version 2.1 Release 1 and higher of CDS Powered by Apache Spark includes an Apache Kafka integration feature that uses the new Kafka consumer API. This new Kafka consumer API supports reading data from secure Kafka clusters. In this context, secure clusters are those that are authenticated by Kerberos, and optionally using TLS/SSL for wire encryption.

Requirements

To read data securely from Kafka, or to use the new Spark-Kafka integration that uses the new Kafka consumer API, requires the following software versions:

  • CDS 2.1 Release 1 or higher.

  • CDK Powered by Apache Kafka 2.1 or higher. (Although the Kafka consumer API is available starting in CDK Powered by Apache Kafka 2.0, the Spark integration requires Kafka 2.1.)

Running Spark Jobs that Integrate with Kafka

To run jobs that use the new Kafka integration, you can use one of the following two techniques.

Technique #1: Set SPARK_KAFKA_VERSION environment variable

When running jobs that require the new Kafka integration, set SPARK_KAFKA_VERSION=0.10 in the shell before launching spark-submit. Use the appropriate environment variable syntax for your shell, such as:

# Set the environment variable for the duration of your shell session:
export SPARK_KAFKA_VERSION=0.10
spark-submit arguments

# Or:

# Set the environment variable for the duration of a single command:
SPARK_KAFKA_VERSION=0.10 spark-submit arguments

Technique #2: Set spark_kafka_version setting through Cloudera Manager

Set spark_kafka_version configuration in Cloudera Manager’s Spark 2 service to be 0.10 and redeploy the client configuration. No need to source any environment variables when launching spark-submit.

Technique #2 is preferable if you have upgraded your Kafka brokers to CDK Powered by Apache Kafka 2.1 or higher, and do not intend to have your Spark jobs communicate with brokers running a version of Kafka prior to CDK Powered by Apache Kafka 2.1.

If you modify the default Kafka version to 0.10 by using technique #2, you can connect to old Kafka brokers (for example, one based on CDK Powered by Apache Kafka 2.0) by setting SPARK_KAFKA_VERSION=0.9 when running your application.

Building Applications

To build applications against the new Kafka integration, you can add the dependency by using the following Maven coordinates:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.4.0.cloudera2</version>
</dependency>

Reading from Authorized Kafka

To read from a Kafka cluster authorized by Sentry, give privileges to your consumer group as described in Configuring Kafka Security. You must also grant privileges to another consumer group spark-executor-<your_consumer_group> in the same way. This is because the driver uses the consumer group specified in your app, but the executors use a different consumer group, which is hardcoded to spark-executor-<your_consumer_group>.