7. DistCp Architecture

DistCp is comprised of the following components:

 7.1. DistCp Driver

The DistCp Driver components are responsible for:

  • Parsing the arguments passed to the DistCp command on the command-line, via:

    • OptionsParser

    • DistCpOptionsSwitch

  • Assembling the command arguments into an appropriate DistCpOptions object, and initializing DistCp. These arguments include:

    • Source-paths

    • Target location

    • Copy options (e.g. whether to update-copy, overwrite, which file attributes to preserve, etc.)

  • Orchestrating the copy operation by:

    • Invoking the copy-listing generator to create the list of files to be copied.

    • Setting up and launching the Hadoop MapReduce job to carry out the copy.

    • Based on the options, either returning a handle to the Hadoop MapReduce job immediately, or waiting until completion.

The parser elements are executed only from the command-line (or if DistCp::run() is invoked). The DistCp class may also be used programmatically, by constructing the DistCpOptions object and initializing a DistCp object appropriately.

 7.2. Copy-listing Generator

The copy-listing generator classes are responsible for creating the list of files/directories to be copied from source. They examine the contents of the source paths (files/directories, including wildcards), and record all paths that need copying into a SequenceFile for consumption by the DistCp Hadoop Job. The main classes in this module include:

  1. CopyListing: The interface that should be implemented by any copy-listing generator implementation. Also provides the factory method by which the concrete CopyListing implementation is chosen.

  2. SimpleCopyListing: An implementation of CopyListing that accepts multiple source paths (files/directories), and recursively lists all of the individual files and directories under each for copy.

  3. GlobbedCopyListing: Another implementation of CopyListing that expands wildcards in the source paths.

  4. FileBasedCopyListing: An implementation of CopyListing that reads the source path list from a specified file.

Based on whether a source file list is specified in the DistCpOptions, the source listing is generated in one of the following ways:

  1. If there is no source file list, the GlobbedCopyListing is used. All wildcards are expanded, and all of the expansions are forwarded to the SimpleCopyListing, which in turn constructs the listing (via recursive descent of each path).

  2. If a source file list is specified, the FileBasedCopyListing is used. Source paths are read from the specified file, and then forwarded to the GlobbedCopyListing. The listing is then constructed as described above.

You can customize the method by which the copy-listing is constructed by providing a custom implementation of the CopyListing interface. The behaviour of DistCp differs here from the legacy DistCp, in how paths are considered for copy.

The legacy implementation only lists those paths that must definitely be copied on to the target. E.g., if a file already exists at the target (and -overwrite isn’t specified), the file is not even considered in the MapReduce copy job. Determining this during setup (i.e. before the MapReduce Job) involves file size and checksum comparisons that are potentially time consuming.

DistCp postpones such checks until the MapReduce job, thus reducing setup time. Performance is enhanced further since these checks are parallelized across multiple maps.

 7.3. InputFormats and MapReduce Components

The InputFormats and MapReduce components are responsible for the actual copying of files and directories from the source to the destination path. The listing file created during copy-listing generation is consumed at this point, when the copy is carried out. The classes of interest here include:

  • UniformSizeInputFormat: This implementation of org.apache.hadoop.mapreduce.InputFormat provides equivalence with Legacy DistCp in balancing load across maps. The aim of the UniformSizeInputFormat is to make each map copy roughly the same number of bytes. Therefore, the listing file is split into groups of paths, such that the sum of file sizes in each InputSplit is nearly equal to every other map. The splitting is not always perfect, but its trivial implementation keeps the setup time low.

  • DynamicInputFormat and DynamicRecordReader: The DynamicInputFormat implements org.apache.hadoop.mapreduce.InputFormat, and is new to DistCp. The listing file is split into several “chunk files”, the exact number of chunk files being a multiple of the number of maps requested for in the Hadoop Job. Each map task is “assigned” one of the chunk files (by renaming the chunk to the task’s id), before the Job is launched. Paths are read from each chunk using the DynamicRecordReader, and processed in the CopyMapper. After all of the paths in a chunk are processed, the current chunk is deleted and a new chunk is acquired. The process continues until no more chunks are available. This “dynamic” approach allows faster map tasks to consume more paths than slower ones, thus speeding up the DistCp job overall.

  • CopyMapper: This class implements the physical file copy. The input paths are checked against the input options (specified in the job configuration), to determine whether a file needs to be copied. A file will be copied only if at least one of the following is true:

    • A file with the same name does not exist at target.

    • A file with the same name exists at target, but has a different file size.

    • A file with the same name exists at target, but has a different checksum, and -skipcrccheck is not mentioned.

    • A file with the same name exists at target, but -overwrite is specified.

    • A file with the same name exists at target, but differs in block-size (and block-size needs to be preserved).

  • CopyCommitter: This class is responsible for the commit phase of the DistCp job, including:

    • Preservation of directory permissions (if specified in the options)

    • Clean up of temporary files, work directories, etc.