Task Execution and Environment
MRAppMaster executes the Mapper andReducer tasks as child processes in separate JVMs.
The child task inherits the environment of the parent MRAppMaster. You can specify additional options to the child JVM via themapreduce.map.java.opts andmapred.reduce.java.opts configuration parameters in theJob. For example, you can set non-standard paths for the runtime linker to search shared libraries via -Djava.library.path=<>. If the mapreduce.map.java.opts ormapred.reduce.java.opts property contains the symbol(taskid), it is interpolated with the taskid value of the MapReduce task.
<property> <name>mapreduce.map.java.opts</name> <value> -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value> -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value> </property>
Memory Management
The memory available to some parts of the framework is also configurable. In map and reduce tasks, performance can be influenced by adjusting parameters influencing the concurrency of operations and the frequency with which data is written to disk. Monitoring the filesystem counters for a job, particularly relative to byte counts from the map and into the reduce, helps you tune these parameters.
Map Parameters
A record emitted from a map is serialized into a buffer, and metadata is stored into accounting buffers. As described in the following options, when either the serialization buffer or the metadata exceed a threshold, the contents of the buffers is sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread blocks. When the map is finished, any remaining records are written to disk, and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper.
Name |
Type |
Description |
---|---|---|
mapreduce.task.io.sort.mb |
int |
The cumulative size of the serialization and accounting buffers storing records emitted from the map, in megabytes. |
mapreduce.map.sort.spill.percent |
float |
The soft limit in the serialization buffer. Once reached, a thread begins to spill contents to disk in the background. |
Other Notes
- If either spill threshold is exceeded while a spill is in progress, collection continues until the spill is finished. For example, ifmapreduce.map.sort.spill.percent is set to 0.33, and the remainder of the buffer is filled while the spill runs, the next spill includes all the collected records, or 0.66 of the buffer, and does not generate additional spills. In other words, the thresholds are defining triggers, not blocking.
- A record larger than the serialization buffer first triggers a spill, then spills to a separate file. It is undefined whether or not this record first passes through the combiner.
Shuffle/Reduce Parameters
As described previously, each reduce fetches the output assigned to it by the partitioner via HTTP into memory, and periodically merges these outputs to disk. If intermediate compression of map outputs is turned on, each output is decompressed into memory. The following options affect the frequency of these merges to disk prior to the reduce and the memory allocated to storing map output during the reduce.
Name |
Type |
Description |
---|---|---|
mapreduce.task.io.sort.factor |
int |
Specifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during the merge. If the number of files exceeds this limit, the merge proceeds in several passes. Although this limit also applies to the map, most jobs should be configured so that reaching this limit is unlikely. |
mapreduce.reduce.merge.inmem.thresholds |
int |
The number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), because merging in-memory segments is often less expensive than merging from disk (see the notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle. |
mapreduce.reduce.shuffle.merge.percent |
float |
The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Because map outputs that cannot fit in memory can be stalled, setting this high can decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces with input that can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle. |
mapreduce.reduce.shuffle.input.buffer.percent |
float |
The percentage of memory, relative to the maximum heap size as typically specified inmapreduce.reduce.java.opts, that can be allocated to storing map outputs during the shuffle. Although some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs. |
mapreduce.reduce.input.buffer.percent |
float |
The percentage of memory relative to the maximum heap size in which map outputs can be retained during the reduce. When the reduce begins, map outputs are merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk. |
Other Notes
- If a map output is larger than 25 percent of the memory allocated to copying map outputs, it is written directly to disk without first staging through memory.
- When running with a combiner, the reasoning for high merge thresholds and large buffers might not apply. For merges started before all map outputs have been fetched, the combiner is run while spilling to disk. In some cases, you can get better reduce times by spending resources combining map outputs – making disk spills small and parallelizing spilling and fetching – instead of aggressively increasing buffer sizes.
- When merging in-memory map outputs to disk to begin the reduce, if an intermediate merge is necessary because there are segments to spill and at leastmapreduce.task.io.sort.factor segments already on disk, the in-memory map outputs are part of the intermediate merge.
Configured Parameters
The following properties are localized in the job configuration for each task's execution:
Name |
Type |
Description |
---|---|---|
mapreduce.job.id |
String |
The job id |
mapreduce.job.jar |
String |
job.jar location in job directory |
mapreduce.job.local.dir |
String |
The job-specific shared scratch space |
mapreduce.task.id |
String |
The task id |
mapreduce.task.attempt.id | String | The task attempt id |
mapreduce.task.is.map |
boolean |
True, if this is a map task |
mapreduce.task.partition |
int |
The id of the task within the job |
mapreduce.map.input.file |
String |
The filename that the map is reading from |
mapreduce.map.input.start |
long |
The offset of the start of the map input split |
mapreduce.map.input.length |
long |
The number of bytes in the map input split |
mapreduce.task.output.dir |
String |
The task's temporary output directory |
During execution of a streaming job, the periods (.) in mapreduce parameters are changed to underscores (_). For example,mapreduce.job.id becomes mapreduce_job_id. To get the values from a streaming job's mapper or reducer, use parameter names with underscores.
Task Logs
The standard output streams (stdout), standard error streams (stderr), and the syslog of the task are read by the NodeManager and logged to${HADOOP_LOG_DIR}/userlogs. By default, they are subsequently aggregated to HDFS.
Distributing Libraries
DistributedCache can be used to distribute both JARs and native libraries for use in the map and reduce tasks. The child JVM always has its current working directory added to thejava.library.path and LD_LIBRARY_PATH. The cached libraries can be loaded via System.loadLibrary or System.load. For more information on how to load shared libraries through a distributed cache, see Native Shared Libraries.