Developing Apache Spark ApplicationsPDF version

Developing and running an Apache Spark WordCount application

This tutorial describes how to write, compile, and run a simple Spark word count application in two of the languages supported by Spark: Scala and Python. The Scala code was originally developed for a Cloudera tutorial written by Sandy Ryza.

This example application is an enhanced version of WordCount, the canonical MapReduce example. In this version of WordCount, the goal is to learn the distribution of letters in the most popular words in a corpus. The application:

  1. Creates a SparkConf and SparkContext. A Spark application corresponds to an instance of the SparkContext class. When running a shell, the SparkContext is created for you.
  2. Gets a word frequency threshold.
  3. Reads an input set of text documents.
  4. Counts the number of times each word appears.
  5. Filters out all words that appear fewer times than the threshold.
  6. For the remaining words, counts the number of times each letter occurs.

In MapReduce, this requires two MapReduce applications, as well as persisting the intermediate data to HDFS between them. In Spark, this application requires about 90 percent fewer lines of code than one developed using the MapReduce API.

  1. Create an empty directory named sparkwordcount in your home directory, and enter it:
    mkdir $HOME/sparkwordcount
    cd $HOME/sparkwordcount
  2. For the Scala version, create the ./com/cloudera/sparkwordcount subdirectories. For Python, skip this step.
    mkdir -p com/cloudera/sparkwordcount
  3. Create the WordCount program in either Scala or Python, using the specified file names and paths:
    • Scala (./com/cloudera/sparkwordcount/SparkWordCount.scala
      import org.apache.spark.SparkContext
      import org.apache.spark.SparkContext._
      import org.apache.spark.SparkConf
      
      object SparkWordCount {
        def main(args: Array[String]) {
          // create Spark context with Spark configuration
          val sc = new SparkContext(new SparkConf().setAppName("SparkWordCount"))
      
          // get threshold
          val threshold = args(1).toInt
      
          // read in text file and split each document into words
          val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))
      
          // count the occurrence of each word
          val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
      
          // filter out words with fewer than threshold occurrences
          val filtered = wordCounts.filter(_._2 >= threshold)
      
          // count characters
          val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
      
          System.out.println(charCounts.collect().mkString(", "))
        }
      }
    • Python (./SparkWordCount.py):
      import sys
      
      from pyspark import SparkContext, SparkConf
      
      if __name__ == "__main__":
      
        # create Spark context with Spark configuration
        conf = SparkConf().setAppName("SparkWordCount")
        sc = SparkContext(conf=conf)
      
        # get threshold
        threshold = int(sys.argv[2])
      
        # read in text file and split each document into words
        tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" "))
      
        # count the occurrence of each word
        wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)
      
        # filter out words with fewer than threshold occurrences
        filtered = wordCounts.filter(lambda pair:pair[1] >= threshold)
      
        # count characters
        charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2)
      
        list = charCounts.collect()
        print repr(list)[1:-1]
This tutorial uses Maven to compile and package the Scala program. Download the tutorial pom.xml file to the parent $HOME/sparkwordcount directory and modify the sections listed below. For best practices using Maven to build Spark applications, see Building Spark Applications.
  1. In the application pom.xml file, include the Scala tools repo and plugin, and add Scala and Spark as dependencies:
    <pluginRepositories>
    <pluginRepository>
        <id>scala-tools.org</id>
        <name>Scala-tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
      </pluginRepository>
    </pluginRepositories>
      <build>
        <sourceDirectory>${project.basedir}</sourceDirectory>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <version>2.15.2</version>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
    
    <dependencies>
      <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.12</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.4.0.7.0.0.0</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
  2. Make sure that you are in the parent $HOME/sparkwordcount directory, and then generate the application JAR as follows:
    mvn package

    This creates sparkwordcount-0.0.1-SNAPSHOT.jar in the target directory.

The input to the application is a large text file in which each line contains all the words in a document, stripped of punctuation. Put an input file in a directory in an S3 bucket that is accessible by your Spark cluster. You can use the tutorial example input file.
  1. Run the applications using spark-submit:
    • Scala on YARN with threshold 2:
      spark-submit --class SparkWordCount \
        --master yarn --deploy-mode client --executor-memory 1g \
        --conf "spark.yarn.access.hadoopFileSystems=s3a://<bucket_name>" \
        target/sparkwordcount-0.0.1-SNAPSHOT.jar \
        s3a://<bucket_name>/<input_filename> 2
    • Python on YARN with threshold 2:
      spark-submit --master yarn --deploy-mode client --executor-memory 1g \
        --conf "spark.yarn.access.hadoopFileSystems=s3a://<bucket_name>" \
        SparkWordCount.py s3a://<bucket_name>/<input_filename> 2
    If you used the example input file, the output is similar to the following:

    Scala:

    19/07/24 09:18:57 INFO scheduler.DAGScheduler: Job 0 finished: collect at SparkWordCount.scala:25, took 10.188651 s
    (p,2), (t,2), (b,1), (h,1), (n,4), (f,1), (v,1), (r,2), (l,1), (e,6), (a,4), (i,1), (u,1), (o,2), (c,1)

    Python:

    19/07/24 09:23:55 INFO scheduler.DAGScheduler: Job 0 finished: collect at /home/user/sparkwordcount/SparkWordCount.py:26, took 11.762150 s
    (u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)