Developing and Running a Spark WordCount Application
This tutorial describes how to write, compile, and run a simple Spark word count application in two of the languages supported by Spark: Scala and Python. The Scala code was originally developed for a Cloudera tutorial written by Sandy Ryza.
Continue reading:
Writing the Application
The example application is an enhanced version of WordCount, the canonical MapReduce example. In this version of WordCount, the goal is to learn the distribution of letters in the most popular words in a corpus. The application:
- Creates a SparkConf and SparkContext. A Spark application corresponds to an instance of the SparkContext class. When running a shell, the SparkContext is created for you.
- Gets a word frequency threshold.
- Reads an input set of text documents.
- Counts the number of times each word appears.
- Filters out all words that appear fewer times than the threshold.
- For the remaining words, counts the number of times each letter occurs.
In MapReduce, this requires two MapReduce applications, as well as persisting the intermediate data to HDFS between them. In Spark, this application requires about 90 percent fewer lines of code than one developed using the MapReduce API.
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SparkWordCount { def main(args: Array[String]) { // create Spark context with Spark configuration val sc = new SparkContext(new SparkConf().setAppName("Spark Count")) // get threshold val threshold = args(1).toInt // read in text file and split each document into words val tokenized = sc.textFile(args(0)).flatMap(_.split(" ")) // count the occurrence of each word val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) // filter out words with fewer than threshold occurrences val filtered = wordCounts.filter(_._2 >= threshold) // count characters val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _) System.out.println(charCounts.collect().mkString(", ")) } }
import sys from pyspark import SparkContext, SparkConf if __name__ == "__main__": # create Spark context with Spark configuration conf = SparkConf().setAppName("Spark Count") sc = SparkContext(conf=conf) # get threshold threshold = int(sys.argv[2]) # read in text file and split each document into words tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" ")) # count the occurrence of each word wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2) # filter out words with fewer than threshold occurrences filtered = wordCounts.filter(lambda pair:pair[1] >= threshold) # count characters charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2) list = charCounts.collect() print repr(list)[1:-1]
Compiling and Packaging Scala Applications
The tutorial uses Maven to compile and package the programs. Excerpts of the tutorial pom.xml are included below. For best practices using Maven to build Spark applications, see Building Spark Applications.
To compile Scala, include the Scala tools plug-in:
<plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
which requires the scala-tools plug-in repository:
<pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories>
Also, include Scala and Spark as dependencies:
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0-cdh6.0.0-beta1</version> <scope>provided</scope> </dependency> </dependencies>
To generate the application JAR, run:
$ mvn package
to create sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar in the target directory.
Running the Application
- The input to the application is a large text file in which each line contains all the words in a document, stripped of punctuation. Put an input file in a directory on HDFS. You can
use tutorial example input file:
$ wget --no-check-certificate .../inputfile.txt $ hdfs dfs -put inputfile.txt
- Run one of the applications using spark-submit:
- Scala - Run in a local process with threshold 2:
$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount \ --master local --deploy-mode client --executor-memory 1g \ --name wordcount --conf "spark.app.id=wordcount" \ sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar \ hdfs://namenode_host:8020/path/to/inputfile.txt 2
If you use the example input file, the output should look something like:
(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
- Python - Run on YARN with threshold 2:
$ spark-submit --master yarn --deploy-mode client --executor-memory 1g \ --name wordcount --conf "spark.app.id=wordcount" wordcount.py \ hdfs://namenode_host:8020/path/to/inputfile.txt 2
In this case, the output should look something like:
[(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]
- Scala - Run in a local process with threshold 2: