Sample code to connect to Kafka data connector
Sample application codes for connecting to a Kafka data connector in Cloudera Data Engineering.
- The following example shows the application code to publish data to a Kafka topic from a Cloudera Data Engineering
job:
import org.apache.spark.sql.SparkSession import java.time.LocalDateTime import java.time.format.DateTimeFormatter object SparkStreaming { def main(args: Array[String]): Unit = { writeToKafka() } private def writeToKafka[T](): Unit = { val now = LocalDateTime.now() val timestamp = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(now) val spark = SparkSession .builder .appName("Spark-Kafka-Streaming") .config("spark.sql.warehouse.dir", "/tmp/cde/warehouse_" + timestamp) .getOrCreate() val df = spark.readStream.text("/tmp/cde") df.printSchema() var outputTopic = "spark-kafka" var checkpoint = "/tmp/cde/checkpoint_" + outputTopic + "_" + timestamp var brokers = "[***KAFKA-BOOTSTRAP-SERVERS***]" val dataStreamWriter = df.writeStream .format("kafka") .option("checkpointLocation", checkpoint) .option("kafka.bootstrap.servers", brokers) .option("topic", outputTopic) .option("kafka.security.protocol", "SASL_SSL") .option("kafka.ssl.truststore.location", "/usr/lib/jvm/jre/lib/security/cacerts") .option("kafka.ssl.truststore.password", "changeit") .option("includeHeaders", "true") .start().awaitTermination() } }The job reads files from the /tmp/cde directory in the file system and publishes to the
spark-kafkaKafka topic. - The following example shows the application code to consume data from a Kafka topic in a Cloudera Data Engineering
job:
import org.apache.kafka.clients.consumer.KafkaConsumer import java.util.{Properties, Arrays} import java.time.Duration import scala.collection.JavaConversions._ object SparkConsuming { def main(args: Array[String]): Unit = { consumeFromKafka() } private def consumeFromKafka[T](): Unit = { val props = new Properties() props.put("bootstrap.servers", "[***KAFKA-BOOTSTRAP-SERVERS***]") props.put("group.id", "consumer-group") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("security.protocol", "SASL_SSL") props.put("ssl.truststore.location", "/usr/lib/jvm/jre/lib/security/cacerts") props.put("ssl.truststore.password", "changeit") props.put("auto.offset.reset", "earliest") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(Arrays.asList("spark-kafka") ) while(true) { val consumed = consumer.poll(Duration.ofMillis(10000) ) for(produce <- consumed) { System.out.println(produce.key() + ":" + produce.value() ) } } } }The job consumes content from the
spark-kafkaKafka topic and prints tostdout.
