Developing Apache Spark ApplicationsPDF version

Spark Streaming Example

This example uses Kafka to deliver a stream of words to a Python word count program.

  1. Create a Kafka topic wordcounttopic:
    kafka-topics --create --zookeeper zookeeper_server:2181 --topic wordcounttopic --partitions 1 --replication-factor 1
    
  2. Create a Kafka word count Python program adapted from the Spark Streaming example kafka_wordcount.py. This version divides the input stream into batches of 10 seconds and counts the words in each batch:
    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
            sys.exit(-1)
    
        sc = SparkContext(appName="PythonStreamingKafkaWordCount")
        ssc = StreamingContext(sc, 10)
    
        zkQuorum, topic = sys.argv[1:]
        kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
        lines = kvs.map(lambda x: x[1])
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
        counts.pprint()
    
        ssc.start()
        ssc.awaitTermination()
  3. Submit the application using spark-submit with dynamic allocation disabled and specifying your ZooKeeper server and topic. To run locally, you must specify at least two worker threads: one to receive and one to process data:
    spark-submit --master yarn --deploy-mode client --conf "spark.dynamicAllocation.enabled=false" --jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py zookeeper_server:2181 wordcounttopic
    

    In a Cloudera Data Platform deployment, SPARK_HOME defaults to /opt/cloudera/parcels/CDH/lib/spark. The shells are also available from /usr/bin.

  4. In another window, start a Kafka producer that publishes to wordcounttopic:
    kafka-console-producer --broker-list kafka_broker:9092 --topic wordcounttopic
    
  5. In the producer window, type the following:
    hello
    hello
    hello
    hello
    hello
    hello
    gb
    gb
    gb
    gb
    gb
    gb

    Depending on how fast you type, in the Spark Streaming application window you will see output like:

    -------------------------------------------
    Time: 2016-01-06 14:18:00
    -------------------------------------------
    (u'hello', 6)
    (u'gb', 2)
    
    -------------------------------------------
    Time: 2016-01-06 14:18:10
    -------------------------------------------
    (u'gb', 4)