Example: WordCount v3.0

You can further improve the quality of your results by filtering out information that is unnecessary or that corrupts your desired output. You can create a list of stop words and punctuation, and then have the application skip them at run time.

Omitting Stop Words and Punctuation

Although not a perfect use case, this example demonstrates how to create a distributed cache file to make information available to all machines in a cluster. In practice, you would more likely just create a String[] variable in your Java class to store your stop words and punctuation.

Create a list of words and punctuation you want to omit from the output and save it in a text file.

Here are some suggested terms and punctuation for this example. Punctuation marks must be preceded by an escape character. Create your list and save it in a file named stop_words.text.

a
an
and
but
is
or
the
to
.
,

Put stop_words.text into the Hadoop file system.

$ hadoop fs -put stop_words.text /user/cloudera/wordcount/

Now, you can update the code to use that list of stop-words to filter your input.

You can find source code for the three versions of WordCount at http://tiny.cloudera.com/hadoopTutorialSample.

Code Changes

Lines 3-9: Import classes for handling file I/O and set support.

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;

Line 20: Import FileSplit, which is used to process a portion of an input file rather than the entire file at once.

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

Line 25: Import StringUtils, which, not surprisingly, has a shed load of utilities for working with strings in Hadoop. See the StringUtils Javadoc to see all of the handy methods this class provides. This example uses the stringifyException() method to convert any IOExceptions to a string before passing it to the System.error.println() method.

import org.apache.hadoop.util.StringUtils;

Lines 40-48

for (int i = 0; i < args.length; i += 1) {
  if ("-skip".equals(args[i])) {
    job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
    i += 1;
    job.addCacheFile(new Path(args[i]).toUri());
    // this demonstrates logging
    LOG.info("Added file to the distributed cache: " + args[i]);
  }
}

Line 54: This time, you can add the optional combiner class to your job configuration. The combiner is run on each of the mapper machines in the cluster to process information locally before sending it to the reducer machine. The combiner can do anything you want in order to prepare the intermediate values prior to sending them to the reducer.

In this case, setting the Reduce class as the combiner counts up the keys locally. For example, of sending <elephant, 1> <elephant, 1> to the reducer, the mapper machine combines them together as <elephant, 2> before forwarding the results to the reducer. Combining values before transmission can save a great deal of bandwidth and transfer time in most cases.

job.setCombinerClass(Reduce.class);

Line 66: Add a variable to store an input chunk from a split file.

private String input;

Line 67: Create a Set of strings called patternsToSkip. This is the list of punctuation and superfluous words to be removed from the final results.

private Set<String> patternsToSkip = new HashSet<String>();

Lines 73-77: Convert information from split sources to a string for processing.

if (context.getInputSplit() instanceof FileSplit) {
  this.input = ((FileSplit) context.getInputSplit()).getPath().toString();
} else {
  this.input = context.getInputSplit().toString();
}

Lines 80-83: If the system variable wordcount.skip.patterns is true, get the list of patterns to skip from a distributed cache file and forward the URI of the file to the parseSkipFile method.

if (config.getBoolean("wordcount.skip.patterns", false)) {
  URI[] localPaths = context.getCacheFiles();
  parseSkipFile(localPaths[0]);
}

Lines 86-98: Get the distributed cache file from HDFS at the local URI. Read a line at a time until you run out of lines. Add each line to the set of strings to skip.

private void parseSkipFile(URI patternsURI) {
  LOG.info("Added file to the distributed cache: " + patternsURI);
  try {
    BufferedReader fis = new BufferedReader(new FileReader(new File(patternsURI.getPath()).getName()));
    String pattern;
    while ((pattern = fis.readLine()) != null) {
      patternsToSkip.add(pattern);
    }
  } catch (IOException ioe) {
    System.err.println("Caught exception while parsing the cached file '"
      + patternsURI + "' : " + StringUtils.stringifyException(ioe));
  }
}

Lines 108-110: Modify the if statement so that if the word variable is empty or it contains one of the defined patterns to skip, the for loop continues without writing a value to the context.

if (word.isEmpty() || patternsToSkip.contains(word)) {
  continue;
}

Running WordCount Version Three

Follow these steps to run the updated version.

  1. Rebuild the application. You can enter these instructions at the command line, or you can use the Makefile command make jar if you are using a CDH package installation.

    To compile in a package installation:

    $ rm -rf build word_count.jar
    $ mkdir -p build
    $ javac -cp /usr/lib/hadoop/*:/usr/lib/hadoop-mapreduce/* WordCount.java -d build -Xlint
    $ jar -cvf wordcount.jar -C build/ .

    To compile with a parcel installation:

    $ rm -rf build word_count.jar
    $ mkdir -p build
    $ javac -cp /opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/* \
         WordCount.java -d build -Xlint
    $ jar -cvf wordcount.jar -C build/ .
  2. Remove the previous results.
    $ hadoop fs -rm -r -f /user/cloudera/wordcount/output
  3. Create a list of stop words. Store them in a file named stop_words.text
    $ echo -e "a\\nan\\nand\\nbut\\nis\\nor\\nthe\\nto\\n.\\n," > stop_words.text
  4. Run the application with the -skip switch and the name of the stop_words.text file.
    $ hadoop jar wordcount.jar org.myorg.WordCount /user/cloudera/wordcount/input /user/cloudera/wordcount/output -skip /user/cloudera/wordcount/stop_words.text

The results are the most useful so far, converting all words to lowercase, omitting punctuation and less significant words. You can continue to change the code to meet your particular use cases.

$ hadoop fs -cat /user/cloudera/wordcount/output/*
!  2
anything    1
are 1
at  1
bad 1
because 1
cling   1
core    1
data    1
does    1
elegant 2
element 1
elephant    3
extraneous  1
fellow  1
forgets 1
gentle  1
gets    1
group   1
hadoop  3
hdfs    1
he  3
helps   1
him 1
his 1
hive    1
impala  1
in  1
king    2
lets    1
mad 1
mellow  1
never   2
plays   1
sqoop   1
thing   1
thrive  1
useful  1
well    1
what    1
with    1
wonderful   1
yellow  2