Job Output

OutputFormat describes the output specification for a MapReduce job. The MapReduce framework relies on the OutputFormat of the job to:

  1. Validate the output specification of the job; for example, check that the output directory does not already exist.
  2. Provide the RecordWriter implementation used to write the output files of the job. Output files are stored in a FileSystem. TextOutputFormat is the default OutputFormat.

OutputCommitter

OutputCommitter describes the commit of task output for a MapReduce job. The MapReduce framework relies on the OutputCommitter of the job to:

  1. Set up the job during initialization; for example, create the temporary output directory for the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job is moved to the RUNNING state.
  2. Clean up the job after job completion; for example, remove the temporary output directory. Job cleanup is done by a separate task at the end of the job. The job is declared SUCCEEDED, FAILED, or KILLED after the cleanup task completes.
  3. Set up the task temporary output. Task setup is done as part of the same task, during task initialization.
  4. Check whether a task needs a commit. This prevents unnecessary commit procedures.
  5. Commit the task output. After the task is done, the task commits its output, if required.
  6. Discard the task commit. If the task is failed or killed, the output is cleaned up. If the task could not clean up (in an exception block), a separate task is launched with the same attempt ID to do the cleanup.

FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce containers, whichever is available on the NodeManager. The JobCleanup task, TaskCleanup tasks, and JobSetup task have the highest priority, in that order.

Task Side-Effect Files

In some applications, component tasks need to create or write to side files, which differ from the actual job output files.

In such cases, two instances of the same Mapper or Reducer could be running simultaneously (for example, speculative tasks), trying to open or write to the same file on the file system. You must pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.

To avoid these issues, the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} subdirectory accessible via ${mapreduce.task.output.dir} for each task attempt on the FileSystem where the output of the task attempt is stored. On successful completion of the task attempt, the files in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) are promoted to ${mapreduce.output.fileoutputformat.outputdir}/. The framework discards the subdirectory of unsuccessful task attempts. This process is completely transparent to the application.

You can use this feature by creating any required side files during execution of a task in ${mapreduce.task.output.dir} via FileOutputFormat.getWorkOutputPath(). The framework promotes them similarly for successful task attempts. This eliminates the need to pick unique paths per task attempt.

The entire discussion holds true for maps of jobs with reducer=NONE (that is, 0 reduces) because output of the map, in that case, goes directly to HDFS.

RecordWriter

RecordWriter writes output key-value pairs to an output file. RecordWriter implementations write the job outputs to the filesystem.