Other Features

This topic describes other MapReduce framework features:

Queues

Administrators can set up queues on a Hadoop cluster. This enables multiple users to share the cluster resources, typically along organizational boundaries. For example, queues use ACLs to control which users can submit jobs. Queues are used primarily by Hadoop schedulers.

Hadoop is configured with a single mandatory queue, called default. Queue names are defined in the mapreduce.job.queuename property of the Hadoop site configuration. Some job schedulers, such as the FairScheduler, support multiple queues.

A job defines the queue to which it needs to be submitted through the mapreduce.job.queuename property, or through the Configuration.set(MRJobConfig.QUEUE_NAME, String) API. Setting the queue name is optional. If a job is submitted without an associated queue name, the job is submitted to the queue derived from the cluster's placement policies (the queue is named, by default, the default queue).

Counter

A Counter represents a global counter, defined either by the MapReduce framework or applications. Each Counter can be of anyEnum type. Counters of a particular Enum are bunched into groups of type CounterGroup.

Applications can define arbitrary Counters (of type Enum) and update them through Counter.increment(long) in the map or reduce methods. These counters are then globally aggregated by the framework.

DistributedCache

DistributedCache distributes application-specific, large, read-only files efficiently. DistributedCache is provided by the MapReduce framework to cache files (text, archives, JARs, and so on) needed by applications.

Applications specify the files to be cached in URLs (hdfs://) in the Job. The DistributedCache assumes that the files specified in hdfs:// URLs are already present on the file system.

The framework copies the required files to the host before any tasks for the job are executed on that host. Files are copied only once per job and caches archives that are unarchived on the hosts.

DistributedCache tracks the modification timestamps of the cached files. The cache files should not be modified by the application or externally while the job is executing.

DistributedCache can be used to distribute simple, read-only data or text files and more complex types such as archives and JAR files. Archives (zip, tar, tgz, and tar.gz files) are unarchived at the hosts.

Files and archives can be distributed by setting the property mapred.cache.{files|archives}. If more than one file or archive must be distributed, they can be added as comma-separated paths. The properties can also be set by APIs Job.addCacheFile(URI)/Job.addCacheArchive(URI) and Job.setCacheFiles(URI)/Job.setCacheArchives(URIs), where URI is of the form hdfs://host:port/absolute-path#link-name. In streaming, the files can be distributed through the command-line options -cacheFile/-cacheArchive.

Optionally, you can direct the DistributedCache to symlink the cached files into the current working directory of the task through the DistributedCache.createSymlink(Configuration) API, or by setting the configuration property mapred.create.symlink as yes. The DistributedCache uses the fragment of the URI as the name of the symlink. For example, the URI hdfs://namenode:port/lib.so.1#lib.so has the symlink name lib.so in the task's current working directory for the file lib.so.1 in the distributed cache.

The DistributedCache can also be used as a rudimentary software distribution mechanism in the map and reduce tasks. It can be used to distribute both JARs and native libraries. The Job.addArchiveToClassPath(Path) or Job.addFileToClassPath(Path) API can be used to cache files and JARs and add them to the classpath of the child JVM. This can also be done by setting the configuration properties mapreduce.job.classpath.{files|archives}. Cached files that are symlinked into the working directory of the task can be used to distribute and load native libraries.

Distributed cache files can be private or public, which determines how they are shared on worker hosts.

  • Private distributed cache files are cached in a local directory. Access is restricted to the user whose jobs need the files. The files are shared by all tasks and jobs of the specific user only, and cannot be accessed by other users on the worker servers. A distributed cache file becomes private by virtue of the permissions on the file system where the files are uploaded, typically HDFS. If the file has no world-readable access, or if the directory path leading to the file has no world-executable access for lookup, the file is private.
  • Public distributed cache files are cached in a global directory, and file access is set up so that they are visible to all users. These files can be shared by tasks and jobs of all users on the slave servers. A distributed cache file becomes public by virtue of the permissions on the file system where the files are uploaded, typically HDFS. If the file has world-readable access and the directory path leading to the file has world-executable access for lookup, the file is public.

Profiling

Profiling returns runtime metrics to help you gauge and optimize performance. You can get a representative sample of profiler information from a small set of maps and reduces.

Profiling is disabled by default. Use the mapreduce.task.profile configuration property to specify whether the system should collect profiler information for some of the tasks in the job. You can set the value using the API call Configuration.set(MRJobConfig.TASK_PROFILE, boolean). Set the value to true to enable task profiling. Hadoop stores profiler information in the user log directory.

After you enable profiling, you can use the configuration property mapreduce.task.profile.{maps|reduces} to set the ranges of MapReduce tasks to profile. You can set the value using the API Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES,String). By default, the specified range is 0-2.

You can also specify the profiler configuration arguments by setting the configuration property mapreduce.task.profile.params. The value can be specified using the APIConfiguration.set(MRJobConfig.TASK_PROFILE_PARAMS,String). If the string contains a %s, it is replaced with the name of the profiling output file when the task runs. Pass these parameters to the task child JVM on the command line. The default value for the profiling parameters is-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s.

Debugging

The MapReduce framework provides a facility to run user-provided scripts for debugging. When a MapReduce task fails, you can run a debug script. The script is given access to the task's stdout and stderr outputs, syslog, and jobconf. The output from the debug script stdout and stderr is displayed on the console diagnostics and also as part of the job UI.

Use DistributedCache todistribute and symlink the script file.

You can submit the debug script quickly by setting values for the properties mapreduce.map.debug.script and mapreduce.reduce.debug.script for debugging map and reduce tasks, respectively. These properties can also be set by using the APIs Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT,String) and Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT,String). In streaming mode, you can submit a debug script with the command-line options -mapdebug and -reducedebug for debugging map and reduce tasks, respectively.

The arguments to the script are the task stdout, stderr, syslog and jobconf files. The debug command, run on the host where the MapReduce task failed, is: $script $stdout $stderr $syslog $jobconf

Pipes programs have the C++ program name as a fifth argument for the command. For pipes programs, the command is $script $stdout $stderr $syslog $jobconf $program.

For pipes, a default script runs to process core dumps under gdb, prints the stack trace, and provides information about running threads.

Data Compression

Hadoop MapReduce provides facilities to specify compression for both intermediate map outputs and the job outputs–that is, the output of the reduces.

Hadoop MapReduce is bundled with a CompressionCodec implementation for the zlib compression algorithm. It also supports gzip, bzip2, snappy, and lz4 file formats.

Hadoop provides native implementations of these compression codecs for performance (zlib) and because of the lack of Java libraries. More details on codec usage and availability are available here.

Intermediate Outputs

Applications can control compression of intermediate map outputs with theConfiguration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) API and the CompressionCodec to be used with theConfiguration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) API.

Job Outputs

Applications can control compression of job outputs using the FileOutputFormat.setCompressOutput(Job, boolean) API. The CompressionCodec to be used can be specified using the FileOutputFormat.setOutputCompressorClass(JobConf, Class) API.

If the job outputs are to be stored in the SequenceFileOutputFormat, the requiredSequenceFile.CompressionType (that is,RECORD/BLOCK – defaults to RECORD) is specified with the SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType) API.

Skipping Bad Records

In Hadoop, you can skip bad input records when processing map inputs. Applications can control this feature through the SkipBadRecords class.

You can skip bad inputs when map tasks crash deterministically on certain input. This is often caused by bugs in the map function. In some cases, the bugs cannot be fixed; for example, in third-party libraries for which the source code is not available. In such cases, the task never succeeds, even after multiple attempts, and the job fails. By skipping bad records, only a small portion of data surrounding the bad records is lost. This might be acceptable for some applications (for example, applications that perform statistical analysis on very large data sets).

This feature is disabled by default. To enable SkipBadRecords, refer to SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).

With this feature enabled, the framework enters "skipping mode" after a certain number of map failures. For more details, see SkipBadRecords.setAttemptsToStartSkipping(Configuration, int). In "skipping mode," map tasks maintain the range of records being processed. To do this, the framework relies on the processed record counter. See SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS and SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS. This counter enables the framework to know how many records have been processed successfully, and what record range caused a task to crash. On further attempts, the application skips this range of records.

The number of records skipped depends on how frequently the processed record counter is incremented by the application. Cloudera recommends that this counter be incremented after every record is processed. This might not be possible in applications that typically batch processing. In such cases, the framework might skip additional records surrounding the bad record. You can control the number of skipped records through SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).

The framework narrows the range of skipped records using a binary search-like approach. The skipped range is divided into two halves, and only one half is processed. On subsequent failures, the framework determines which half contains bad records. A task is re-executed until the acceptable skipped value is met or all task attempts are exhausted. To increase the number of task attempts, use Job.setMaxMapAttempts(int) and Job.setMaxReduceAttempts(int).

Skipped records are written to HDFS in the sequence file format for later analysis. You can change the location with SkipBadRecords.setSkipOutputPath(JobConf, Path).