Spark Streaming Example
This example uses Kafka to deliver a stream of words to a Python word count program.
- Create a Kafka topic
:kafka-topics --create --zookeeper zookeeper_server:2181 --topic wordcounttopic --partitions 1 --replication-factor 1
- Create a Kafka word count Python program adapted from the Spark
Streaming example This version divides the input stream into
batches of 10 seconds and counts the words in each batch:
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: <zk> <topic>", file=sys.stderr) sys.exit(-1) sc = SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 10) zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
- Submit the application using
with dynamic allocation disabled and specifying your ZooKeeper server and topic. To run locally, you must specify at least two worker threads: one to receive and one to process data:spark-submit --master yarn --deploy-mode client --conf "spark.dynamicAllocation.enabled=false" --jars SPARK_HOME/lib/spark-examples.jar zookeeper_server:2181 wordcounttopic
In a deployment, SPARK_HOME defaults to
. The shells are also available from/usr/bin
. -
In another window, start a Kafka producer that publishes to
:kafka-console-producer --broker-list kafka_broker:9092 --topic wordcounttopic
In the producer window, type the following:
hello hello hello hello hello hello gb gb gb gb gb gb
Depending on how fast you type, in the Spark Streaming application window you will see output like:
------------------------------------------- Time: 2016-01-06 14:18:00 ------------------------------------------- (u'hello', 6) (u'gb', 2) ------------------------------------------- Time: 2016-01-06 14:18:10 ------------------------------------------- (u'gb', 4)