Spark Streaming Example
This example uses Kafka to deliver a stream of words to a Python word count program.
- Create a Kafka topic
wordcounttopic
:kafka-topics --create --zookeeper zookeeper_server:2181 --topic wordcounttopic --partitions 1 --replication-factor 1
- Create a Kafka word count Python program adapted from the Spark
Streaming example kafka_wordcount.py. This version divides the input stream into
batches of 10 seconds and counts the words in each batch:
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) sys.exit(-1) sc = SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 10) zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
- Submit the application using
spark-submit
with dynamic allocation disabled and specifying your ZooKeeper server and topic. To run locally, you must specify at least two worker threads: one to receive and one to process data:spark-submit --master yarn --deploy-mode client --conf "spark.dynamicAllocation.enabled=false" --jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py zookeeper_server:2181 wordcounttopic
In a Cloudera Data Platform deployment, SPARK_HOME defaults to
/opt/cloudera/parcels/CDH/lib/spark
. The shells are also available from/usr/bin
. -
In another window, start a Kafka producer that publishes to
wordcounttopic
:kafka-console-producer --broker-list kafka_broker:9092 --topic wordcounttopic
-
In the producer window, type the following:
hello hello hello hello hello hello gb gb gb gb gb gb
Depending on how fast you type, in the Spark Streaming application window you will see output like:
------------------------------------------- Time: 2016-01-06 14:18:00 ------------------------------------------- (u'hello', 6) (u'gb', 2) ------------------------------------------- Time: 2016-01-06 14:18:10 ------------------------------------------- (u'gb', 4)