Sample code to connect to Kafka data connector

Sample application codes for connecting to a Kafka data connector in Cloudera Data Engineering.

  • The following example shows the application code to publish data to a Kafka topic from a Cloudera Data Engineering job:
    import org.apache.spark.sql.SparkSession
    import java.time.LocalDateTime
    import java.time.format.DateTimeFormatter
    
    object SparkStreaming {
      def main(args: Array[String]): Unit = {
        writeToKafka()
      }
      private def writeToKafka[T](): Unit = {
        val now = LocalDateTime.now()
        val timestamp = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(now)
    
        val spark = SparkSession
         .builder
         .appName("Spark-Kafka-Streaming")
         .config("spark.sql.warehouse.dir", "/tmp/cde/warehouse_" + timestamp)
         .getOrCreate()
    
        val df = spark.readStream.text("/tmp/cde")
        df.printSchema()
    
        var outputTopic = "spark-kafka"
        var checkpoint = "/tmp/cde/checkpoint_" + outputTopic + "_" + timestamp
        var brokers = "[***KAFKA-BOOTSTRAP-SERVERS***]"
    
        val dataStreamWriter = df.writeStream
         .format("kafka")
         .option("checkpointLocation", checkpoint)
         .option("kafka.bootstrap.servers", brokers)
         .option("topic", outputTopic)
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.ssl.truststore.location", "/usr/lib/jvm/jre/lib/security/cacerts")
         .option("kafka.ssl.truststore.password", "changeit")
         .option("includeHeaders", "true")
         .start().awaitTermination()
        }
    }

    The job reads files from the /tmp/cde directory in the file system and publishes to the spark-kafka Kafka topic.

  • The following example shows the application code to consume data from a Kafka topic in a Cloudera Data Engineering job:
    import org.apache.kafka.clients.consumer.KafkaConsumer
    import java.util.{Properties, Arrays}
    import java.time.Duration
    import scala.collection.JavaConversions._
    
    
    object SparkConsuming {
      def main(args: Array[String]): Unit = {
        consumeFromKafka()
      }
      private def consumeFromKafka[T](): Unit = {
        val props = new Properties()
        props.put("bootstrap.servers", "[***KAFKA-BOOTSTRAP-SERVERS***]")
        props.put("group.id", "consumer-group")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("security.protocol", "SASL_SSL")
        props.put("ssl.truststore.location", "/usr/lib/jvm/jre/lib/security/cacerts")
        props.put("ssl.truststore.password", "changeit")
        props.put("auto.offset.reset", "earliest")
        
        val consumer = new KafkaConsumer[String, String](props)
        consumer.subscribe(Arrays.asList("spark-kafka") )
        while(true) {
          val consumed = consumer.poll(Duration.ofMillis(10000) )
          for(produce <- consumed) {
            System.out.println(produce.key() + ":" + produce.value() )
          }
        }
      }
    }

    The job consumes content from the spark-kafka Kafka topic and prints to stdout.