Developing and Running a Spark WordCount Application

This tutorial describes how to write, compile, and run a simple Spark word count application in three of the languages supported by Spark: Scala, Python, and Java. The Scala and Java code was originally developed for a Cloudera tutorial written by Sandy Ryza.

Writing the Application

The example application is an enhanced version of WordCount, the canonical MapReduce example. In this version of WordCount, the goal is to learn the distribution of letters in the most popular words in a corpus. The application:

  1. Creates a SparkConf and SparkContext. A Spark application corresponds to an instance of the SparkContext class. When running a shell, the SparkContext is created for you.
  2. Gets a word frequency threshold.
  3. Reads an input set of text documents.
  4. Counts the number of times each word appears.
  5. Filters out all words that appear fewer times than the threshold.
  6. For the remaining words, counts the number of times each letter occurs.

In MapReduce, this requires two MapReduce applications, as well as persisting the intermediate data to HDFS between them. In Spark, this application requires about 90 percent fewer lines of code than one developed using the MapReduce API.

Here are three versions of the program:
Scala WordCount
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
  def main(args: Array[String]) {
    // create Spark context with Spark configuration
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))

    // get threshold
    val threshold = args(1).toInt

    // read in text file and split each document into words
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

    // count the occurrence of each word
    val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

    // filter out words with fewer than threshold occurrences
    val filtered = wordCounts.filter(_._2 >= threshold)

    // count characters
    val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

    System.out.println(charCounts.collect().mkString(", "))
  }
}
Python WordCount
import sys

from pyspark import SparkContext, SparkConf

if __name__ == "__main__":

  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Spark Count")
  sc = SparkContext(conf=conf)

  # get threshold
  threshold = int(sys.argv[2])

  # read in text file and split each document into words
  tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" "))

  # count the occurrence of each word
  wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  # filter out words with fewer than threshold occurrences
  filtered = wordCounts.filter(lambda pair:pair[1] >= threshold)

  # count characters
  charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  list = charCounts.collect()
  print repr(list)[1:-1]
Java 7 WordCount
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class JavaWordCount {
  public static void main(String[] args) {

    // create Spark context with Spark configuration
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));

    // get threshold
    final int threshold = Integer.parseInt(args[1]);

    // read in text file and split each document into words
    JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap(
      new FlatMapFunction() {
        public Iterable call(String s) {
          return Arrays.asList(s.split(" "));
        }
      }
    );

    // count the occurrence of each word
    JavaPairRDD<String, Integer> counts = tokenized.mapToPair(
      new PairFunction() {
        public Tuple2 call(String s) {
          return new Tuple2(s, 1);
        }
      }
    ).reduceByKey(
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );

    // filter out words with fewer than threshold occurrences
    JavaPairRDD<String, Integer> filtered = counts.filter(
      new Function, Boolean>() {
        public Boolean call(Tuple2 tup) {
          return tup._2 >= threshold;
        }
      }
    );

    // count characters
    JavaPairRDD<Character, Integer> charCounts = filtered.flatMap(
      new FlatMapFunction<Tuple2<String, Integer>, Character>() {
        @Override
        public Iterable<Character> call(Tuple2<String, Integer> s) {
          Collection<Character> chars = new ArrayList<Character>(s._1().length());
          for (char c : s._1().toCharArray()) {
            chars.add(c);
          }
          return chars;
        }
      }
    ).mapToPair(
      new PairFunction<Character, Character, Integer>() {
        @Override
        public Tuple2<Character, Integer> call(Character c) {
          return new Tuple2<Character, Integer>(c, 1);
        }
      }
    ).reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );

    System.out.println(charCounts.collect());
  }
}

Because Java 7 does not support anonymous functions, this Java program is considerably more verbose than Scala and Python, but still requires a fraction of the code needed in an equivalent MapReduce program. Java 8 supports anonymous functions and their use can further streamline the Java application.

Compiling and Packaging the Scala and Java Applications

The tutorial uses Maven to compile and package the Scala and Java programs. Excerpts of the tutorial pom.xml are included below. For best practices using Maven to build Spark applications, see Building Spark Applications.

To compile Scala, include the Scala tools plug-in:

<plugin>
  <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
</plugin>

which requires the scala-tools plug-in repository:

<pluginRepositories>
<pluginRepository>
    <id>scala-tools.org</id>
    <name>Scala-tools Maven2 Repository</name>
    <url>http://scala-tools.org/repo-releases</url>
  </pluginRepository>
</pluginRepositories>

Also, include Scala and Spark as dependencies:

<dependencies>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.2</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.0-cdh5.7.0</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

To generate the application JAR, run:

$ mvn package

to create sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar in the target directory.

Running the Application

  1. The input to the application is a large text file in which each line contains all the words in a document, stripped of punctuation. Put an input file in a directory on HDFS. You can use tutorial example input file:
    $ wget --no-check-certificate .../inputfile.txt
    $ hdfs dfs -put inputfile.txt
  2. Run one of the applications using spark-submit:
    • Scala - Run in a local process with threshold 2:
      $ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount \
      --master local --deploy-mode client --executor-memory 1g \
      --name wordcount --conf "spark.app.id=wordcount" \
      sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2
      If you use the example input file, the output should look something like:
      (e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
    • Java - Run in a local process with threshold 2:
      $ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount \
      --master local --deploy-mode client --executor-memory 1g \
      --name wordcount --conf "spark.app.id=wordcount" \
      sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2
      If you use the example input file, the output should look something like:
      (e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
    • Python - Run on YARN with threshold 2:
      $ spark-submit --master yarn --deploy-mode client --executor-memory 1g \
      --name wordcount --conf "spark.app.id=wordcount" wordcount.py hdfs://namenode_host:8020/path/to/inputfile.txt 2
      In this case, the output should look something like:
      [(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]